1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
| import datetime
from baseConfig import DATABASE
DB_CONF = DATABASE['mysql']
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, ForeignKey, CHAR, BOOLEAN, VARCHAR, \ DateTime from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base
engine = create_engine("mysql+pymysql://{username}:{password}@{host}:{port}/{name}".format(**DB_CONF), echo=True)
metadata = MetaData(engine)
customer = Table('customer', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), Column('fullname', String(40)), ) address_table = Table('address', metadata, Column('id', Integer, primary_key=True), Column('customer_id', None, ForeignKey('customer.id')), Column('email', String(128), nullable=False) )
def execute_create(): metadata.create_all()
Base = declarative_base() Base.metadata.bind = engine
class Person(Base): __tablename__ = 'person' id = Column(Integer, primary_key=True) name = Column('name', String(250)) cname = Column('cname', CHAR(30), nullable=True) age = Column('age', Integer) position = Column('position', VARCHAR(20)) address = Column('address', VARCHAR(50), nullable=True) creatime = Column('creatime', DateTime, default=datetime.datetime.now) flag = Column('flag', BOOLEAN, default=False)
DBSession = sessionmaker(bind=engine)
session = DBSession()
def create_table(): Base.metadata.create_all()
def session_select(): query_1 = session.execute('select * from user') res1 = query_1.fetchall() print('sql查询 res1: ', res1) print('==============================================')
def session_add(data, table=None): table = Person if table is None else table for dic in data: p_stu = table(**dic) session.add(p_stu) session.commit()
def session_update_v1(table: str, data: dict, where=''): flag = True try: data = data.items() func = lambda x: ' = '.join(x) field_info = ','.join(list(map(func, data))) where = ' WHERE ' + where if where else '' sql_update = '''UPDATE {0} SET {1} {2}'''.format(table, field_info, where) session.execute(sql_update) session.commit() except Exception as e: print(e) flag = False return flag
def session_update_v2(): row = session.query(Person).filter_by(name='alfred001').update({Person.name: 'xxx',Person.cname: 'superman'}) session.commit()
def session_delete(): row = session.query(Person).filter_by(name='sum')[0] print(row) session.delete(row) session.commit()
def session_drop(): Base.metadata.drop_all()
def code_test(): data = [ { 'name': 'alfred', 'age': '28', 'position': 'python', }, { 'name': 'xxx', 'age': '28', 'position': 'php', }, { 'name': 'sum', 'age': '26', 'position': 'ruby', } ] data2 = { 'name': '\'alfred001\'', 'age': '28', 'position': '\'php+web\'', }
if __name__ == '__main__': code_test()
|