Python开发-043_Python操作Mysql数据库

前文题到过,mysql是可以使用任何支持sql语句和网络连接的软件进行连接的,恰好python作为编程语言也能胜任mysql客户端的任务,并且发送的指令都是相同的

# 注意 需要开启防火墙3306端口
firewall-cmd --permanent --add-port=3306/tcp
# 注意 需要用户开启远程访问权限
grant all privileges on *.* to 'root'@'%' identified by '123456' with grant option;
# 刷新配置 
flush privileges;
# 注意 账号密码即使是纯数字也需要转为字符串格式

Python提供了一个函数库专门用于操作mysql

pip3 install pymysql

import pymysql

1 连接数据库

conn = pymysql.connect(
    host='192.168.0.115',
    port=3306,
    user='root',
    password = '123456',
    charset='utf8'
) # 连接数据库

cursor = conn.cursor()  # 产生一个游标对象(用于执行命令)

2 数据库操作

2.1 查看数据库

cursor.execute('show databases')
# 获取指令结果
result = cursor.fetchall()	# .fetchall查看全部结果
print(result)
'''输出结果
(('information_schema',), ('day47',), ('db1',), ('demo',), ('demo2',), ('log',), ('mysql',), ('performance_schema',), ('school',), ('test',), ('test_demo',), ('website',))
'''

2.2 创建数据库

# 创建数据库
cursor.execute("create database db_python default charset utf8 collate utf8_general_ci")
conn.commit()   # 涉及数据修改的操作都需要使用conn.commit()确认

补充知识:

collate utf8_general_ci是数据库校对规则

cicase insensitive的缩写,意思是大小写不敏感;相对的是cs,即case sensitive,大小写敏感;还有一种是utf8_bin,是将字符串中的每一个字符用二进制数据存储,区分大小写

如果建表的时候选择的是区别大小写的规则而查询的时候又暂时不想区别,可以用类似WHERE column_name COLLATE utf8_general_ci = 'xxx'的写法改变查询使用的校对规则,新建数据库时一般选用utf8_general_ci

2.3 删除数据库

cursor.execute('drop database db_python')
conn.commit()

2.4 进入数据库

cursor.execute("use db4")

2.5 关闭连接

# 关闭连接
cursor.close()
conn.close()

3 数据表操作

在进行数据库连接时,添加字段database='数据库名'可以指定连接到某一具体数据库

conn = pymysql.connect(
    host='192.168.0.115',
    port=3306,
    user='root',
    password = '123456',
    database='school', # 指定数据库
    charset='utf8'
)

3.1 创建数据表

cursor.execute("""
create table demo(
    id int primary key,
    name varchar(16) not null,
    email varchar(32) null
)default charset = utf8;
""")
conn.commit()

3.2 查看数据表

cursor.execute('show tables')
# 获取指令结果
result = cursor.fetchall() # 查看全部结果
print(result)

3.3 删除数据表

cursor.execute('drop table demo')
conn.commit()

4 数据相关

在增删改查中,查是不会对数据进行修改,而增删改则会,他们是需要进行二次确认的

conn = pymysql.connect(
    host='192.168.0.115',
    port=3306,
    user='root',
    password='123456',
    database='school',
    autocommit = True, # 自动进行确认
    charset='utf8'
)

4.1 读取数据

cursor.execute('select * from student')
result = cursor.fetchall()
print(result)

默认输出结果为元组,并不方便调用

((1, '钢蛋', '女', 1), (2, '铁锤', '女', 1), (3, '山炮', '男', 2), (4, '铁牛', '男', 2))

可以通过修改游标对象cursor = conn.cursor()参数的方式将其变为字典

cursor = conn.cursor(cursor=pymysql.cursors.DictCursor )

结果就将改变为

[{'sid': 1, 'sname': '钢蛋', 'gender': '女', 'class_id': 1}, {'sid': 2, 'sname': '铁锤', 'gender': '女', 'class_id': 1}, {'sid': 3, 'sname': '山炮', 'gender': '男', 'class_id': 2}, {'sid': 4, 'sname': '铁牛', 'gender': '男', 'class_id': 2}]

4.2 插入数据

cursor.execute("insert into student(sname,gender,class_id) values ('大爷','女',2)")

pymysql库支持同时插入多条数据

sql = 'insert into student(sname,gender,class_id) values(%s,%s,%s)'
rows = cursor.executemany(sql,[('aaa','男',1),('bbb','男',1),('ccc','男',1)])

4.3 修改数据

cursor.execute("update student set gender='男' where sname='大爷'")

4.4 删除数据

cursor.execute("delete from student where sname='大爷'")

5 游标对象

既然提到了数据读取,就回顾到了文件读取的重要概念,游标对象

cursor.fetchone() # 只拿一条数据
cursor.fetchmany(2) # 自定义那几条数据
cursor.fetchall() # 拿全部的数据

做个小测试

cursor.execute('select * from student')
result = cursor.fetchone()
print(result)
'''
输出结果 {'sid': 1, 'sname': '钢蛋', 'gender': '女', 'class_id': 1}
'''
result = cursor.fetchmany(2)
print(result)	# 自动读取第二条和第三条 没有读取第一条
'''
输出结果 [{'sid': 2, 'sname': '铁锤', 'gender': '女', 'class_id': 1}, {'sid': 3, 'sname': '山炮', 'gender': '男', 'class_id': 2}]
'''

它与文件读取相同,本质有一个光标的概念,在读取某条数据后,会移动光标,继续往后读取数据

它也与文件读取相同,光标是可以移动的

# 相对移动
cursor.scroll(1,'relative') # 相对光标当前位置往后移1位
# 绝对移动
cursor.scroll(2,'absolute') # 相对光标起始位置往后移2位

# 测试
cursor.execute('select * from student')
print(cursor.fetchone())    # 输出sid=1的数据 光标位于sid=1之后
cursor.scroll(2,'relative') # 往后移动两位光标 光标位于sid=3之后
print(cursor.fetchone())    # 输出sid=4的数据 光标位于sid=4之后
cursor.scroll(2,'absolute') # 光标起始位置后移两位 位于sid=2之后
print(cursor.fetchone())    # 输出sid=3的数据 光标位于sid=3之后

6 存储过程调用

存储过程最大的麻烦就是如果形参可以被修改数值,则我们必须定义mysql变量进行传值,pymysql帮我们很好地解决了这个问题

import pymysql

conn = pymysql.connect(
    host='192.168.0.115',
    port=3306,
    user='root',
    passwd='123456',
    db='school',
    charset='utf8',
    autocommit=True
)
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.callproc('p1',(1,5,10))
# 这里不需要传变量的原因是被pymysql优化了
print(cursor.fetchall())

'''pymysql的callproc函数会将参数转换为
@_p1_0=1
@_p1_1=5
@_p1_2=10
变量进行传入
'''

cursor.execute('select @_p1_2;')
print(cursor.fetchall()) # [{'@_p1_2': 666}] 这里结果已经被存储过程修改过了

7 事务

import pymysql
conn = pymysql.connect(
    host='192.168.0.115',
    port=3306,
    user='root',
    password='123456',
    charset='utf8',
    db='log'
)
cursor = conn.cursor()
conn.begin() # 开启事务
# 不报错 则 else 确认 报错 则 回滚
try:
    cursor.execute("update user set balance=1000 where name ='aaa'")
    cursor.execute("update user set balance=1000 where name ='bbb'")
except Exception as e:
    conn.rollback()
else:
    conn.commit()
cursor.close()
conn.close()

8 锁的应用

应用场景:总共100件商品,每次购买一件需要让商品个数减1

A: 访问页面查看商品剩余 100
B: 访问页面查看商品剩余 100

此时 A、B 同时下单,那么他们同时执行SQL:
	update goods set count=count-1 where id=3
由于Innodb引擎内部会加锁,所以他们两个即使同一时刻执行,内部也会排序逐步执行。


但是,当商品剩余 1个时,就需要注意了。
A: 访问页面查看商品剩余 1
B: 访问页面查看商品剩余 1

此时 A、B 同时下单,那么他们同时执行SQL:
	update goods set count=count-1 where id=3
这样剩余数量就会出现 -1,很显然这是不正确的,所以应该怎么办呢?


这种情况下,可以利用 排它锁,在更新之前先查询剩余数量,只有数量 >0 才可以购买,所以,下单时应该执行:
	begin; -- start transaction;
	select count from goods where id=3 for update;
	-- 获取个数进行判断
	if 个数>0:
		update goods set count=count-1 where id=3;
	else:
		-- 已售罄
	commit;

为了让python模拟,我们创建了一个表格,模拟出了aaa bbb ccc三件物品

create table goods(
	id int primary key auto_increment,
    name varchar(32) not null,
    count int not null
);
insert into goods(name,count) values('aaa',1000),('bbb',1000),('ccc',1000);

使用python代码的多线程模拟同时购买的情况

import pymysql
import threading

def task():
    conn = pymysql.connect(
        host='192.168.0.115',
        port=3306,
        user='root',
        passwd='123456',
        charset='utf8',
        db='website'
    )
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    # 开启事务
    conn.begin()    # 2.开启事务只能等conn.close()完成事务 才能解锁
    cursor.execute("select id,count from goods where id=2 for update ") # 3.上锁
    result = cursor.fetchone()  # 只输出一行
    cursor_count = result['count']
    if cursor_count>0:
        # 如果库存大于0 则显示当前库存并且-1 表示被买走
        print('当前库存{}'.format(cursor_count))
        cursor.execute("update goods set count=count-1 where id=2")
    else:
        print('已售罄')
    conn.commit()
    cursor.close()
    conn.close()

def run():
    for i in range(5):
        # 1.使用多线程模拟多人同时操作
        t = threading.Thread(target=task)
        t.start()

if __name__ == '__main__':
    run()

9 数据库连接池

每次申请数据库连接都会进行握手挥手操作,他会占用内存带宽,同时使用数据库的人太多,可能导致数据库服务器直接因为连接占用内存太多而宕机

使用数据库连接池,可以有效限制连接数和减少申请数据库连接的次数

import pymysql
import threading
from dbutils.pooled_db import PooledDB

MYSQL_DB_POOL = PooledDB(
    # 连接池配置
    creator=pymysql, # 使用pymysql模块连接数据库
    maxconnections=5, # 连接池最大允许连接数
    mincached=2, # 初始化连接池创建的空闲连接,0表示不创建
    maxcached=3, # 连接池中最多空闲的连接,超过则销毁连接
    blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
    # 如:0 = None = never, 1 = default = whenever it is requested,
    # 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='192.168.0.115',
    port=3306,
    user='root',
    password='123456',
    database='website',
    charset='utf8'
)

def task():
    conn = MYSQL_DB_POOL.connection() # 从连接池获取一个连接
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    cursor.execute('select sleep(2)') # 等待2秒
    result = cursor.fetchall()
    print(result)
    cursor.close() # 关闭游标
    conn.close() # 将连接交还给连接池

def run():
    for i in range(10):
        t = threading.Thread(target=task)
        t.start()

if __name__ == '__main__':
    run()

10 SQL工具类

基于数据库连接池开发一个公共的SQL操作类,方便后续直接操作数据库

10.1 单例模式

db_conn.py

import pymysql
from dbutils.pooled_db import PooledDB

class DBHelper(object):
    def __init__(self):
        # 基本数据库配置
        self.pool = PooledDB(
            creator=pymysql,
            maxconnections=5,
            mincached=2,
            maxcached=3,
            blocking=True,
            setsession=[],
            ping=0,
            host='192.168.0.115',
            port=3306,
            user='root',
            passwd='123456',
            database='website',
            charset='utf8'
        )
    def get_conn_cursor(self):
        # 运行连接池 返回资源池连接和它的运行游标对象
        conn = self.pool.connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        return conn,cursor
    def close_conn_cursor(self,*args):
        # *args会接受 资源池连接和它的运行游标对象
        for item in args:
            # 将他们分别是释放掉
            item.close()
    def exec(self,sql,**kwargs):
        # 命令执行
        conn,cursor = self.get_conn_cursor()
        # conn获取sql连接
        # cursor获取游标对象
        cursor.execute(sql,kwargs)
        # sql语句使用sql进行传入
        # 需要一次性插入多个的参数用kwargs
        conn.commit()
        self.close_conn_cursor(conn,cursor) # 关闭连接释放资源
    def fetch_one(self,sql,**kwargs):
        # 命令执行 附带返回当前行
        conn,cursor = self.get_conn_cursor()
        cursor.execute(sql,kwargs)
        result = cursor.fetchone()
        self.close_conn_cursor(conn,cursor)
        return result
    def fetch_all(self,sql,**kwargs):
        conn,cursor = self.get_conn_cursor()
        cursor.execute(sql,kwargs)
        result = cursor.fetchall()
        self.close_conn_cursor(conn,cursor)
        return result

db = DBHelper()

此类的单例模式是通过在类文件中,自我调用一次实现的

只需要导入该文件 后续执行db.函数名即可

test.py

from db_conn import db

# 打印当前数据库
result = db.fetch_all('select * from goods')
print(result)

# 打印id=2的数据
result = db.fetch_all('select * from goods where id=%(id)s',id=2)
print(result)

# 插入数据
db.exec("insert into goods(name,count) values (%(name)s,%(count)s)",name='ddd',count='1000')

10.2 上下文管理

上下文管理需要使用__enter____exit__

db_conn2.py

import pymysql
from dbutils.pooled_db import PooledDB

# 连接使用全局变量
POOL = PooledDB(
    creator=pymysql,
    maxconnections=5,
    mincached=2,
    maxcached=3,
    blocking=True,
    setsession=[],
    ping=0,
    host='192.168.0.115',
    port=3306,
    user='root',
    passwd='123456',
    database='website',
    charset='utf8'
)

class Connect(object):
    def __init__(self):
        # 申请连接池连接
        self.conn = POOL.connection()
        self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
    def __enter__(self):
        return self # with 会运行 __enter__
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()	# __exit__ 会因为with完成而关闭
        self.conn.close()
    def exec(self,sql,**kwargs):
        self.cursor.execute(sql,kwargs)
        self.conn.commit()
    def fetch_one(self,sql,**kwargs):
        self.cursor.execute(sql,kwargs)
        result = self.cursor.fetchone()
        return result
    def fetch_all(self,sql,**kwargs):
        self.cursor.execute(sql,kwargs)
        result = self.cursor.fetchall()
        return result

test2.py

from db_conn2 import Connect

with Connect() as obj:
    ret = obj.fetch_one("select * from goods")
    print(ret)

    ret = obj.fetch_all("select * from goods where id > %(id)s",id=3)
    print(ret)

    obj.exec("insert into goods(name,count) values (%(name)s,%(count)s)",name='eee',count='1111')

Python开发-043_Python操作Mysql数据库
http://localhost:8080/archives/qN7UksFw
作者
kinght
发布于
2024年11月11日
更新于
2024年11月11日
许可协议