看着一双美丽的眼睛流动着对那个时代的向往,不由得让人也期待啊。
创建数据库方式
- Database First # 使用navicat图形化管理工具一步一步创建
- Moedl First # 使用navicat的画图工具画出结构,然后自动生成
- Code First # 用Python写好数据库的模型,不用专门写SQL语句,主要解决创建数据的问题,专注于业务模型的设计。
模型创建与映射
模型概念
关于MVC方式,个人这么理解:
M --> model模块,主要的核心函数处理模块,负责处理大部分的业务的业务模型
V --> view模块,视图模块,负责展现出好看的界面
c --> ctrol模块,操作控制模块,负责处理控制部分
ORM:对象关系映射,包括数据创建过程,数据查询过程,删除,更新的过程。ORM操作数据库模型
来间接的操作数据库。
后期的所有数据库操作都是通过ORM来操作的。
创建妹子图项目模型
首先在数据库中创建好数据库,数据库名字为meizi,字符集为utf8mb4 – UTF-8 Unicode,排序规则为utf8mb4_general_ci
还是使用之前的妹子图的小项目,先创建目录data,然后新建creata_database.py,准备创建数据库结构,需要的数据结构大致如下
class data():
id = None
# id号
title = ''
# 页面标题
url = ''
# 该页面的主网址
show_img = ''
# 页面的单张展示图片
all_img = ''
# 页面的所有图片
def sample(self):
pass
然后使用sqlalchemy把结构直接映射到数据库,非常方便。这里提起一下,sqlalchemy不是flask自带的,但是falsk对sqlalchemy做了部分的优化,使用pipenv install flask-sqlalchemy即可安装。
ok,下面的新建的数据库模型
# -*- coding: utf-8 -*-
# @Time : 2018/7/24 0024 13:40
# @Author : Langzi
# @Blog : www.sxadmin.github.io
# @File : creata_database.py
# @Software: PyCharm
import sys
from sqlalchemy import Column,Integer,String
# 导入column(字段),Integer(数字类型)
reload(sys)
sys.setdefaultencoding('utf-8')
class data():
id = Column(Integer,primary_key=True,autoincrement=True)
# 设置id号为数据库的主键,并且自增长
# 相当于SQL:id int primary key auto_increment
title = Column(String(100),nullable=True,default='获取该妹子信息失败')
# 页面标题长度为100,并且允许为空,如果为空的话就生成上面的default
url = Column(String(100),nullable=False,unique=True)
# 该页面的主网址长度100,不允许为空,不允许重复
show_img = Column(String(100),nullable=False)
# 页面的单张展示图片
all_img = Column(String(100),nullable=False)
# 页面的所有图片
def sample(self):
pass
妹子图项目映射到数据库
然后实例化sql的对象,新增代码:
from flask_sqlalchemy import SQLAlchemy
# 实例化sqlalchemy对象,在flask中导入进来
db = SQLAlchemy()
# db就是sqlalchemy的初始化的核心对象
class data(db.Model):
id= xxx重复上面的代码
把之前的类继承db即可。然后回到主目录下的init.py文件下,该文件之前是把蓝图注册到app里面的,之前的init.py代码如下
# -*- coding: utf-8 -*-
# @Time : 2018/7/19 0019 18:36
# @Author : Langzi
# @Blog : www.sxadmin.github.io
# @File : __init__.py.py
# @Software: PyCharm
import sys
from flask import Flask
from app.web.data.creata_database import db
reload(sys)
sys.setdefaultencoding('utf-8')
def create_app():
app = Flask(__name__,template_folder=('web/templates'))
app.config.from_object('config')
start_Blueprint(app)
return app
def start_Blueprint(app):
from app.web.Mmzi import web
app.register_blueprint(web)
导入data目录下create_database.py中的db对象,然后注册到app当中并且初始化对象,并且调用他。
# -*- coding: utf-8 -*-
# @Time : 2018/7/19 0019 18:36
# @Author : Langzi
# @Blog : www.sxadmin.github.io
# @File : __init__.py.py
# @Software: PyCharm
import sys
from flask import Flask
#from app.web.data.creata_database import db
reload(sys)
sys.setdefaultencoding('utf-8')
def create_app():
app = Flask(__name__,template_folder=('web/templates'))
app.config.from_object('config')
start_Blueprint(app)
db.init_app(app)
with app.app_context():
db.create_all()
return app
def start_Blueprint(app):
from app.web import web
app.register_blueprint(web)
app就像一个插板一样,蓝图可以注册到app里面,路由注册到app里面,甚至数据库也可以注册到app里面,后期的登录等模块一样注册到里面,当前已经差不多配置完成了,还有重要的一点没说,那就是链接到哪个数据库里面。
在主目录下的config.py中可以添加数据库的配置信息,之前config之定义了DEBUG的开关,这个时候可以加入数据库配置。
SQLAlchemy本身无法操作数据库,其必须以来pymsql,cymysql,mysqldb等第三方插件,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,下面使用过cymysql进行操作
app主目录下的config.py配置文件
# -*- coding: utf-8 -*-
# @Time : 2018/7/17 0017 20:42
# @Author : Langzi
# @Blog : www.sxadmin.github.io
# @File : config.py
# @Software: PyCharm
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'mysql+cymysql://root:root@127.0.0.1:3306/meizi'
# SQLALCHEMY_DATABASE_URI是flask定义好的配置项,就像DEBUG一样
# mysql + cymysql : 使用mysql数据库,驱动使用cymysql(需要pipenv install cymysql),你还可以使用pymysql
# 然后数据库账号密码,主机和端口以及数据库名字
最后允许主程序的时候,会发现数据库中多了一个名为meizi的数据库,其中多了一张名为data的表,数据库的结构和定义的结构一致。
不同连接数据库插件使用方法
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
方法一:使用Pymysql与SQLAlchemy操作数据库
# coding=utf-8
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:xiaoming.note5@115.159.193.77:3306/school?charset=utf8", max_overflow=5)
# 执行SQL
cur = engine.execute(
"insert into user (name, password) values('lihy', 'lihy')"
)
# 新插入行自增ID
cur.lastrowid
# 执行SQL
cur = engine.execute(
"insert into user(name, password) values(%s, %s)", [('liq', 'liq'), ('liuxj', 'liuxj235')]
)
# 执行SQL
cur = engine.execute(
"insert into user(name, password) values(%(name)s, %(password)s)", name='lium', password='lium123'
)
# 执行SQL
cur = engine.execute('select * from user')
# 获取第一行数据, 第n行,所有数据
cur.fetchone()
cur.fetchmany(3)
cur.fetchall()
方法二:使用SQLALchemy操作数据库
初始化对象
# -*-coding:utf-8-*-
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:root@127.0.0.1:3306/movie'
# 用于连接数据的数据库
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。
db = SQLAlchemy(app)
# 实例化db对象
创建数据库模型
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(35), unique=True)
pwd = db.Column(db.String(20))
phone = db.Column(db.String(11), unique=True)
def __repr__(self):
# repr返回一个对象的 string 格式,repr的用法是:比如a='admin',repr(a)>>>“‘admin’”
return '<User %r>'%self.name
数据库外键关联
关系使用 relationship() 函数表示。然而外键必须用类 sqlalchemy.schema.ForeignKey 来单独声明:
class Person(db.Model):
__tablename__ = 'persion'
id = db.Column(db.Integer,primary_key=True,autoincrement=True)
name = db.Column(db.String(100),index=True)
pwd = db.Column(db.String(100),index=True)
info = db.relationship('Address',backref = 'ss')
class Address(db.Model):
__tablename__ = 'address'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(50))
persion_id = db.Column(db.Integer, db.ForeignKey('persion.id'))
上面的例子中,address表中的person_id关联对象为person表的id的值。随后在person表中设置关联属性,关联属性名为info,第一个参数指向Address模型,另一个名叫backref的参数,叫做反向关系,我们将其设置成backref=ss。设置好了后它会向Persion模型中添加一个名为info的属性,这个属性可以代替persion_id去直接访问Persion模型的对象。
>>>r=persion(id=1,name='admin')
>>>u=User(id=10,name='Hyman',persion_id=1)
>>>r.info.append(u)
>>>print u.role
<Persion admin>
我们将u添加到r的info中,这样我们同时也在u中添加了一个名叫做persion的属性,而这属性就是我们定义的r对象.这就是所谓的可以用info代替persion_id访问Role模型,但是它获取的是Persion模型对象而非器对应的id的值…
如果使用MYSQL语句创建数据库然后使用外键关联的话,这么写:
用户表
create table user(
id int primary key autoincrement,
username varchar(100) not null
)
文章表
create table article(
id int primary key autoincrement,
title varchar(100) not null,
content text,
authot_id int, //这个是关联用户表的id字典,要记住一个表的外键一定是另一个表的主键
foreign ket 'author_id' references 'user.id'
)
就这个例子如果使用SQLALchemy的话,创建模型这么写
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer,primary_key=True,autoincrement=True)
username = db.Column(db.String(100),nullable=False)
class Article(db.Model):
__tablename__ = 'article'
id = db.Column(db.Integer,primary_key=True,autoincrement=True)
title = db.Column(db.String(100),nullable=False)
content = db.Column(db.Text)
author_id = db.Column(db.Integer,db.ForeignKey('user.id'))# 引用user表的id字段
执行创建数据库
db.create_all()
增删改查
向数据库插入数据分为三个步骤:
- 创建 Python 对象
- 把它添加到会话
- 提交会话
增
from 你的数据库模型文件 import 你创建的数据库中的表名
me = 你创建的数据库中的表名('admin','18')
db.session.add(me)
db.session.commit()
删
db.session.delete(me)
db.session.commit()
查
假设数据库中有如下条目,并且下面条目的表名为User:
id username email
1 admin admin@example.com
2 peter peter@example.org
3 guest guest@example.com
获取所有数据:
res = User.query(username).all()
获取返回数据的第一行:
res = User.query(username).first()
过滤数据:
res = User.query(username).filter(id>1).all()
limit:
res = User.query(username).all()[1:3]
and or:
from sqlalchemy import and_
User.query.filter(and_(id==1, username=='admin')).all()
User.query.filter(id==1, username=='admin').all()
User.query.filter(id==1).filter(username=='admin').all()
from sqlalchemy import or_
User.query.filter(or_(id==1, id==2)).all()
多条件查询:
res = User.query(username).filter(id>0).filter(id<7).all()
通过用户名查询用户:
peter = User.query.filter_by(username='peter').first()
peter.id
>>> 1
peter.email
>>> peter@example.org
如果查询一个不存在的用户名返回 None
使用更复杂的表达式查询一些用户:
User.query.filter(User.email.endswith('@example.com')).all()
>>> [<User u'admin'>, <User u'guest'>]
返回对象是列表
按某种规则对用户排序:
User.query.order_by(User.username)
>>> [<User u'admin'>, <User u'guest'>, <User u'peter'>]
限制返回用户的数量:
User.query.limit(1).all()
>>> [<User u'admin'>]
filter_by
和filter
都是过滤条件,只是用法有区别filter_by
里面不能用!=
还有> <
等等,所有filter
用得更多,filter_by
只能用=
。
改
res = User.query.filter(User.username='admin').first()
res.username='root'
db.sission.commit()
实例演示一
#coding:utf-8
from flask_sqlalchemy import SQLAlchemy
from flask import Flask,render_template
app = Flask(__name__,template_folder='')
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:root@127.0.0.1:3306/test404?charset=utf8'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
class Article(db.Model):
__tablename__ = 'article'
id = db.Column(db.Integer,primary_key=True,autoincrement=True)
title = db.Column(db.String(50),index=True)
content = db.Column(db.Text)
db.create_all()
@app.route('/')
def index():
# # 添加数据
# # insert article(title,content) values('妹子大粑粑','AAbbCCaaaeeA啊啊')
# ti = Article(title = '妹子大粑粑',content='AAbbCCaaaeeA啊啊')
# db.session.add(ti)
# db.session.commit()
# # 查询数据
# # select * from article where title='妹子大粑粑'
# result = Article.query.filter(Article.title=='妹子大粑粑').first().content
# # 第一条的内容
# res1 = Article.query.filter(Article.title=='妹子大粑粑').all()[0].content
# # 第一条的内容
# return render_template('index.html',x=res1)
# # 修改数据
# # 先查询到要修改的数据,然后对需要修改的地方修改,然后提交
# result = Article.query.filter(Article.title == '妹子大粑粑').first()
# result.content = '啊啊啊啊啊啊啊啊啊去啊啊啊啊啊啊啊土拨鼠啊'
# db.session.commit()
# # 删除数据
# # 同上
# result = Article.query.filter(Article.title == '妹子大粑粑').first()
# db.session.delete(result)
# db.session.commit()
app.run(debug=True)
实例演示二
创建一个数据库对象
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
name = Column(String(32))
def __repr__(self):
return "<Person(name='%s')>" % self.name
添加数据
创建一个person对象
person = Person(name='jack')
添加person对象,但是仍然没有commit到数据库
session.add(person)
commit操作
session.commit()
如何获取id的?
>>> person = Person(name='ilis')
>>> person.id #此时还没有commit到mysql,因此无id
>>> session.add(person)
>>> person.id #同上
>>> session.commit()
2015-08-18 23:08:23,530 INFO sqlalchemy.engine.base.Engine INSERT INTO person (name) VALUES (%s)
2015-08-18 23:08:23,531 INFO sqlalchemy.engine.base.Engine ('ilis',)
2015-08-18 23:08:23,532 INFO sqlalchemy.engine.base.Engine COMMIT
>>> person.id #commit后,可以获取该对象的id
2015-08-18 23:08:27,556 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-08-18 23:08:27,557 INFO sqlalchemy.engine.base.Engine SELECT person.id AS person_id, person.name AS person_name
FROM person
WHERE person.id = %s
2015-08-18 23:08:27,557 INFO sqlalchemy.engine.base.Engine (5L,)
5L
>>>
添加多个数据
session.add_all([
Person(name='jack'),
Person(name='mike')
])
session.commit()
回滚
>>> person = Person(name='test')
>>> session.add(person)
>>> session.query(person).filter(name=='test')
>>> session.query(Person).filter(Person.name=='test').all()
2015-08-18 23:13:23,265 INFO sqlalchemy.engine.base.Engine INSERT INTO person (name) VALUES (%s)
2015-08-18 23:13:23,265 INFO sqlalchemy.engine.base.Engine ('test',)
2015-08-18 23:13:23,267 INFO sqlalchemy.engine.base.Engine SELECT person.id AS person_id, person.name AS person_name
FROM person
WHERE person.name = %s
2015-08-18 23:13:23,267 INFO sqlalchemy.engine.base.Engine ('test',)
[<demo.Person object at 0x7f4e37730510>]
>>> session.rollback()
2015-08-18 23:13:37,496 INFO sqlalchemy.engine.base.Engine ROLLBACK
>>> session.query(Person).filter(Person.name=='test').all()
2015-08-18 23:13:38,690 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-08-18 23:13:38,691 INFO sqlalchemy.engine.base.Engine SELECT person.id AS person_id, person.name AS person_name
FROM person
WHERE person.name = %s
2015-08-18 23:13:38,692 INFO sqlalchemy.engine.base.Engine ('test',)
[]
>>>
数据查询
使用Session的query()方法。
获取所有数据
session.query(Person).all()
获取某一列数据,类似于django的get,如果返回数据为多个则报错
session.query(Person).filter(Person.name==’jack’).one()
获取返回数据的第一行
session.query(Person).first()
过滤数据
session.query(Person.name).filter(Person.id>1).all()
limit
session.query(Person).all()[1:3]
order by
session.query(Person).ordre_by(-Person.id)
equal/like/in
query = session.query(Person)
query.filter(Person.id==1).all()
query.filter(Person.id!=1).all()
query.filter(Person.name.like('%ac%')).all()
query.filter(Person.id.in_([1,2,3])).all()
query.filter(~Person.id.in_([1,2,3])).all()
query.filter(Person.name==None).all()
and or
from sqlalchemy import and_
query.filter(and_(Person.id==1, Person.name=='jack')).all()
query.filter(Person.id==1, Person.name=='jack').all()
query.filter(Person.id==1).filter(Person.name=='jack').all()
from sqlalchemy import or_
query.filter(or_(Person.id==1, Person.id==2)).all()
使用text
from sqlalchemy import text
query.filter(text("id>1")).all()
query.filter(Person.id>1).all() #同上
query.filter(text("id>:id")).params(id=1).all() #使用:,params来传参
query.from_statement(
text("select * from person where name=:name")).\
params(name='jack').all()
计数
Query使用count()函数来实现查询计数。
query.filter(Person.id>1).count()
group by的用法
from sqlalchemy import func
session.query(func.count(Person.name), Person.name),group_by(Person.name).all()
实现count(*)来查询表内行数
session.query(func.count('*')).select_from(Person).scalar()
session.query(func.count(Person.id)).scalar()
Pymysql with 操作
import contextlib
@contextlib.contextmanager
def mysql(host='127.0.0.1',user='root',passwd='root',db='meizi',port=3306,charset='utf8'):
conn = pymysql.connect(host='127.0.0.1',user='root',passwd='root',db='meizi',port=3306,charset='utf8')
cursor = conn.cursor()
try:
yield cursor
finally:
conn.commit()
cursor.close()
conn.close()
# # 执行sql
# with mysql() as cursor:
# print(cursor)
# row_count = cursor.execute("select * from tb7")
# row_1 = cursor.fetchone()
# print row_count, row_1
SQLALchemy with操作
SQLALCHEMY使用的是连接池进行数据操作,但是不同的页面操作数据耗时不一样,这就导致后面的进行执行插入数据的时候没有足够的进程可以提供操作就报错,解决方法及时扩大连接池加使用上下文管理,是每个操作及时关闭。
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://127.0.0.1/database?charset=utf8'
SQLALCHEMY_TRACK_MODIFIACTIONS = False
SQLALCHEMY_POOL_SIZE = 50
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1
在数据库模型设计文件中,新写一个上下文管理器
@contextlib.contextmanager
def data2mysql():
try:
yield db
except:
db.session.rollback()
finally:
db.session.remove()
在别的所有调用db.session()方法中全部使用with操作,节省连接池。
url = 'https://sxadmin.github.io'
first_ins = url_index(url=url)
with data2mysql() as dbs:
dbs.session.add(first_ins)
dbs.session.commit()
数据库连接池
DBUtils是Python的一个用于实现数据库连接池的模块
此连接池有两种连接模式:
DBUtils提供两种外部接口:
PersistentDB :提供线程专用的数据库连接,并自动管理连接。
PooledDB :提供线程间可共享的数据库连接,并自动管理连接。
PersistentDB 模式
为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把链接重新放到链接池,供自己线程再次使用,当线程终止时,链接自动关闭
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
closeable=False,
# 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='test',
charset='utf8'
)
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
if __name__ == '__main__':
func()
PooledDB 模式
创建一批连接到连接池,供所有线程共享使用。
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='test',
charset='utf8'
)
def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()
# print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '\r\n')
cursor = conn.cursor()
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
conn.close()
if __name__ == '__main__':
func()