Python工具类
本文最后更新于:2025年4月29日 下午
使用 pyinstaller 打包
在 pycharm 中点击新建环境
进入文件后,会有一个文件夹venv
,用来存放管理的库文件
创建 requirements.txt
pip install requests
pip freeze > requirements.txt
可以看到虚拟环境下安装的包
安装 pyinstaller
pip install pyinstaller
安装完成后将终端关闭,重新打开
我们输入pyinstaller
,当出现一下结果,说明安装成功
单文件打包
pyinstaller -D app.py
开始打包
之后在当前文件夹会出现三个文件
- build:中间编译时,产出的文件
- dist:打包生成后的文件目录
- xxx.spec:打包的配置文件
在dist
中可以看到有我们上面起的名字app
,只需将app 文件夹发给他人,打开里面的app.exe文件,即可使用
文件读写
with open('xxx.txt', 'w', encoding='utf-8') as f:
f.write('xxx' + '\n')
f.close()
爬虫
代码案例
from bs4 import BeautifulSoup
import requests
def getHTMLText(url):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'}
res = requests.get(url, headers=headers)
res.raise_for_status()
res.encoding = res.apparent_encoding
return res.text
except:
return 'error'
def get_info(url):
demo = getHTMLText(url)
soup = BeautifulSoup(demo, 'html.parser')
table = soup.find('table', {'class': 'rk-table'})
tbody = table.find('tbody')
rows = tbody.find_all('tr')
for row in rows:
tds = row.find_all('td')
rank = tds[0].text.strip()
name = row.find('a').text.strip()
location = tds[2].text.strip()
total = tds[4].text.strip()
data = {
'排名': rank,
'学校名称': name,
'省市': location,
'总分': total
}
print(data)
url_2020 = 'https://www.shanghairanking.cn/rankings/bcur/2020'
url_2021 = 'https://www.shanghairanking.cn/rankings/bcur/2021'
get_info(url_2021)
爬虫框架 Scrapy
- 简介
基本功能
Scrapy 是一个适用爬取网站数据、提取结构性数据的应用程序框架,它可以应用在广泛领域:Scrapy 常应用在包括数据挖掘,信息处理或存储历史数据等一系列的程序中。通常我们可以很简单的通过 Scrapy 框架实现一个爬虫,抓取指定网站的内容或图片。架构
**Scrapy Engine(引擎)**:负责 Spider、ItemPipeline、Downloader、Scheduler 中间的通讯,信号、数据传递等。
**Scheduler(调度器)**:它负责接受引擎发送过来的 Request 请求,并按照一定的方式进行整理排列,入队,当引擎需要时,交还给引擎。
Downloader(下载器):负责下载 Scrapy Engine(引擎)发送的所有 Requests 请求,并将其获取到的 Responses 交还给 Scrapy Engine(引擎),由引擎交给 Spider 来处理。
Spider(爬虫):它负责处理所有 Responses,从中分析提取数据,获取 Item 字段需要的数据,并将需要跟进的 URL 提交给引擎,再次进入 Scheduler(调度器)。
**Item Pipeline(管道)**:它负责处理 Spider 中获取到的 Item,并进行进行后期处理(详细分析、过滤、存储等)的地方。
Downloader Middlewares(下载中间件):一个可以自定义扩展下载功能的组件。
Spider Middlewares(Spider 中间件):一个可以自定扩展和操作引擎和 Spider 中间通信的功能组件。
scrapy 项目的结构
项目名字 项目的名字 spiders 文件夹(存储的是爬虫文件) init 自定义的爬虫文件 核心功能文件 init items 定义数据结构的地方 爬虫的数据都包含哪些 middleware 中间件 代理 pipelines 管道 用来处理下载的数据 settings 配置文件 robots 协议 ua 定义等
- 环境搭建
pip install scrapy
- 项目搭建
scrapy startproject 工程名(img)
结果
New Scrapy project 'img', using template directory 'D:\Environment\Anaconda3\Lib\site-packages\scrapy\templates\project', created in:
C:\Users\Admin\Desktop\scrapy\img
You can start your first spider with:
cd img
scrapy genspider example example.com
转到项目,创建源文件
cd img
scrapy genspider unsplash unsplash.com
结果
Created spider 'unsplash' using template 'basic' in module:
img.spiders.unsplash
运行
scrapy crawl unsplash(为文件名)
Scrapy 爬取 unsplash
一、分析网站
高清图片网站https://unsplash.com/, 能展示超过 7w+张高清图片. 浏览时, 其通过 API 返回图片的 URl
在 chrome 浏览器中有此插件 unsplash, 在插件文件中找到对应 JS, 再找出 api 地址
根据插件安装的时间找到对应的 chrome 插件目录
二、爬取图片 URL
安装 Scrapy, pip install Scrapy
在工程目录中创建项目 scrapy startproject scrapy_unsplash E:\wendi1\pyplace\scrapy_unsplash
进入项目目录, 创建爬虫 scrapy genspider unsplash api.unsplash.com –template=crawl
- 此时在./scrapy_unsplash/spiders 目录中生成 unsplash.py 文件.
- 配置 settings.py , 并发连接数 CONCURRENT_REQUESTS = 100 , 下载延迟 DOWNLOAD_DELAY = 1.6
首先需要爬取网站, 然后将 raw 种类的图片 URL 存入 sqlite, unsplash.py 代码:
# -*- coding: utf-8 -*-
import json
import sqlite3
import threading
import scrapy
from scrapy.spiders import CrawlSpider
class UnsplashSpider(CrawlSpider):
name = 'unsplash'
allowed_domains = ['api.unsplash.com']
def start_requests(self):
createDB() # 创建数据库
start, page = 1, 2000, # 要爬的页面数
for i in range(start, page + 1): # 从第一页开始
url = 'https://api.unsplash.com/photos/?client_id=fa60305aa82e74134cabc7093ef54c8e2c370c47e73152f72371c828daedfcd7&page=' + str(
i) + '&per_page=30'
yield scrapy.Request(url=url, callback=self.parse_item)
def parse_item(self, response):
conn = sqlite3.connect("E:\\wendi1\\pyplace\\scrapy_unsplash\\database\\link.db") # 连接数据库
print('-------------------')
js = json.loads(str(response.body_as_unicode()), 'utf-8') # 读取响应body,并转化成可读取的json
for j in js:
link = j["urls"]["raw"]
sql = "INSERT INTO LINK(LINK) VALUES ('%s');" % link # 将link插入数据库
conn.execute(sql)
semaphore = threading.Semaphore(1) # 引入线程信号量,避免写入数据库时死锁
semaphore.acquire() # P操作
conn.commit() # 写入数据库,此时数据库文件独占
semaphore.release() # V操作
def createDB(): # 创建数据库
conn = sqlite3.connect("E:\\wendi1\\pyplace\\scrapy_unsplash\\database\\link.db") # Sqlite是一个轻量数据库,不占端口,够用
conn.execute("DROP TABLE IF EXISTS LINK;") # 重新运行删掉数据库
conn.execute("CREATE TABLE LINK (" # 创建属性ID:主键自增;属性LINK:存放图片链接
"ID INTEGER PRIMARY KEY AUTOINCREMENT,"
"LINK VARCHAR(255));")
执行 scrapy crawl unsplash . 在第 16 行可修改爬取的页面总数
url 已存入 sqlite 中 :
连接 Mysql 数据库
使用pymysql
官方库,封装公用代码
新增一个pymysql_comm.py
类, 代码如下:
import pymysql
from timeit import default_timer
host = 'localhost'
port = 3306
user = 'root'
password = '123456'
db = 'mysql_test' # 连接的数据库
charset = 'utf8mb4'
# ---- 用pymysql 操作数据库
def get_connection():
conn = pymysql.connect(host=host, port=port, db=db, user=user, password=password, charset=charset)
return conn
# ---- 使用 with 的方式来优化代码
class UsingMysql(object):
def __init__(self, commit=True, log_time=True, log_label='总用时'):
"""
:param commit: 是否在最后提交事务(设置为False的时候方便单元测试)
:param log_time: 是否打印程序运行总时间
:param log_label: 自定义log的文字
"""
self._log_time = log_time
self._commit = commit
self._log_label = log_label
def __enter__(self):
# 如果需要记录时间
if self._log_time is True:
self._start = default_timer()
# 在进入的时候自动获取连接和cursor
conn = get_connection()
cursor = conn.cursor()
# cursor = conn.cursor(pymysql.cursors.DictCursor) 可使返回的结果不再是普通的元组列表,而是字典列表
conn.autocommit = False
self._conn = conn
self._cursor = cursor
return self
def __exit__(self, *exc_info):
# 提交事务
if self._commit:
self._conn.commit()
# 在退出的时候自动关闭连接和cursor
self._cursor.close()
self._conn.close()
if self._log_time is True:
diff = default_timer() - self._start
print('-- %s: %.6f 秒' % (self._log_label, diff))
@property
def cursor(self):
return self._cursor
当我们需要使用时,增删改查演示:
先增加了一条记录, 然后接着查看这条记录
from pymysql_comm import UsingMysql
def select_one(cursor):
cursor.execute("select * from Product")
data = cursor.fetchone()
print("-- 单条记录: {0} ".format(data))
# 新增单条记录
def create_one():
with UsingMysql(log_time=True) as um:
sql = "insert into Product(name, remark) values(%s, %s)"
params = ('男士双肩背包1', '这个是非常好的背包')
um.cursor.execute(sql, params)
# 查看结果
select_one(um.cursor)
if __name__ == '__main__':
create_one()
删除某条记录
from pymysql_comm import UsingMysql
def delete_one(cursor, name):
sql = 'delete from Product where name = %s'
params = name
cursor.execute(sql, params)
print('--- 已删除名字为%s的商品. ' % name)
def select_one(cursor):
sql = 'select * from Product'
cursor.execute(sql)
data = cursor.fetchone()
print('--- 已找到名字为%s的商品. ' % data['name'])
return data['name']
def select_one_by_name(cursor, name):
sql = 'select * from Product where name = %s'
params = name
cursor.execute(sql, params)
data = cursor.fetchone()
if data:
print('--- 已找到名字为%s的商品. ' % data['name'])
else:
print('--- 名字为%s的商品已经没有了' % name)
# 删除单条记录
def check_delete_one():
with UsingMysql(log_time=True) as um:
# 查找一条记录
name = select_one(um.cursor)
# 删除
delete_one(um.cursor, name)
# 查看还在不在?
select_one_by_name(um.cursor, name)
if __name__ == '__main__':
check_delete_one()
修改记录
from pymysql_comm import UsingMysql
def update_by_pk(cursor, name, pk):
sql = "update Product set name = '%s' where id = %d" % (name, pk)
cursor.execute(sql)
def select_one(cursor):
sql = 'select * from Product'
cursor.execute(sql)
return cursor.fetchone()
def select_one_by_name(cursor, name):
sql = 'select * from Product where name = %s'
params = name
cursor.execute(sql, params)
data = cursor.fetchone()
if data:
print('--- 已找到名字为%s的商品. ' % data['name'])
else:
print('--- 名字为%s的商品已经没有了' % name)
# 修改记录
def check_update():
with UsingMysql(log_time=True) as um:
# 查找一条记录
data = select_one(um.cursor)
pk = data['id']
print('--- 商品{0}: '.format(data))
# 修改名字
new_name = '单肩包'
update_by_pk(um.cursor, new_name, pk)
# 查看
select_one_by_name(um.cursor, new_name)
if __name__ == '__main__':
check_update()
查询
from pymysql_comm import UsingMysql
def fetch_list_by_filter(cursor, pk):
sql = 'select * from Product where id > %d' % pk
cursor.execute(sql)
data_list = cursor.fetchall()
print('-- 总数: %d' % len(data_list))
return data_list
# 查找
def fetch_list():
with UsingMysql(log_time=True) as um:
# 查找id 大于800的记录
data_list = fetch_list_by_filter(um.cursor, 800)
# 查找id 大于 10000 的记录
data_list = fetch_list_by_filter(um.cursor, 10000)
if __name__ == '__main__':
fetch_list()
分页查询
from pymysql_comm import UsingMysql
def fetch_page_data(cursor, pk, page_size, skip):
sql = 'select * from Product where id > %d limit %d,%d' % (pk, skip, page_size)
cursor.execute(sql)
data_list = cursor.fetchall()
print('-- 总数: %d' % len(data_list))
print('-- 数据: {0}'.format(data_list))
return data_list
# 查找
def check_page():
with UsingMysql(log_time=True) as um:
page_size = 10
pk = 500
for page_no in range(1, 6):
print('====== 第%d页数据' % page_no)
skip = (page_no - 1) * page_size
fetch_page_data(um.cursor, pk, page_size, skip)
if __name__ == '__main__':
check_page()
爬虫案例(写入数据库)
实例
import json
import re
import pymysql
import requests
conn = pymysql.connect(host='127.0.0.1' # 连接名称,默认127.0.0.1
, user='root' # 用户名
, passwd='123456' # 密码
, port=3306 # 端口,默认为3306
, db='python' # 数据库名称
, charset='utf8' # 字符编码
)
cur = conn.cursor() # 生成游标对象
def getHTMLText(url):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'}
res = requests.get(url, headers=headers)
res.raise_for_status()
res.encoding = res.apparent_encoding
return res.text
except:
return 'error'
def main(url):
res = getHTMLText(url)
regex = re.compile(r"(?=\()(.*)(?<=\))")
jsonString = regex.findall(res)[-1]
jsonString = jsonString.strip('()')
jsonData = json.loads(jsonString)
diffValue = jsonData['data']['diff']
for i in range(len(diffValue)):
demo = diffValue[i]['f12']
name = diffValue[i]['f14']
sql = "INSERT INTO money(demo,name) VALUES ('{}', '{}');".format(demo, name)
cur.execute(sql)
conn.commit()
print(demo+name)
cur.close()
conn.close()
url = 'http://85.push2.eastmoney.com/api/qt/clist/get?cb=jQuery1124007516892373587614_1682312504170&pn=1&pz=20&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&wbp2u=|0|0|0|web&fid=f3&fs=m:0+t:6,m:0+t:80,m:1+t:2,m:1+t:23,m:0+t:81+s:2048&fields=f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f12,f13,f14,f15,f16,f17,f18,f20,f21,f23,f24,f25,f22,f11,f62,f128,f136,f115,f152&_=1682312504458'
main(url)
图片转文字画
import numpy as np
from PIL import Image
import time
if __name__ == '__main__':
start_time = time.time()
image_file = 'lena.png'
height = 100
img = Image.open(image_file)
img_width, img_height = img.size
width = int(1.8 * height * img_width // img_height)
img = img.resize((width, height), Image.ANTIALIAS)
pixels = np.array(img.convert('L'))
print('type(pixels) = ', type(pixels))
print(pixels.shape)
print(pixels)
chars = "MNHQ$OC?7>!:-;. "
N = len(chars)
step = 256 // N
print(N)
result = ''
for i in range(height):
for j in range(width):
result += chars[pixels[i][j] // step]
result += '\n'
with open('text.txt', mode='w') as f:
f.write(result)
end_time = time.time()
elapsed_time = end_time - start_time
print(f'程序耗时:{elapsed_time:.2f} 秒')
示例
GUI
用 Python 来写 GUI 的库:pyqt、wxpython、tkinter、kivy
比较常用的是 tkinter 它是 Python 内置的库
检测是否存在
打开 cmd,输入:
python -m tkinter
开始使用
tkinter 把不同的组件都封装成了 Class
每个组件都有一些属性可以设置,比如可以设置字体常用的宽高字体颜色
import tkinter as tk
app = tk.Tk()
app.title("My App") # 定义窗口的标题
app.geometry("600x400") # 使用 geometry指定窗口的宽高
lb = tk.Label(app, text="Hello World") # 想要往里加入文本,可以使用 Label 对象
lb.pack() # 用 pack 塞到窗口中去
tk.mainloop() # 调用了 mainloop 方法,主要是让它去循环等待用户的交互
示例:
import tkinter as tk
app = tk.Tk()
app.title("Jarvis")
app.geometry("600x400")
lb = tk.Label(app, text="Hello World", width=20, height=10, fg="blue").pack()
tk.Button(app, text="点我", width=20, background='pink').pack()
tk.mainloop()
按钮可以定义点击事件:当点击按钮的时候调用方法来修改 Lable 里面的内容(command 来绑定回调函数)