本文主要描述使用python连接sqlite数据库,并进行简单的CURD操作
一、简介
SQLite是一种C语言的嵌入式的数据库,它实现了一个 小型, 快速, 自包含, 高可靠性, 功能齐全的 SQL数据库引擎,它是一个零配置的数据库,我们不需要在系统中配置。SQLite 直接访问其存储文件。
二、安装
安装比较简单,
具体的安装步骤,可以参考这篇博客:Windows 上如何安装Sqlite
三、使用
这里主要展示利用python中的api去连接和操作sqlite
首先,操作关系数据库,需要连接到数据库,一个数据库连接称为Connection;
之后,打开游标,称之为Cursor,通过Cursor执行SQL语句;
最后,获得执行结果。
1. 简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import sqlite3
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute('CREATE TABLE USER (ID INT PRIMARY KEY, NAME VARCHAR(50) NOT NULL, AGE INT NOT NULL)')
conn.commit()
conn.close()
|
2. 自定义操作类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import sqlite3 import collections
class SQLiteAPI(object): def __init__(self, dbname=None): self.dbname = dbname if dbname is not None else 'test.db' self.conn = sqlite3.connect(self.dbname) self.cursor = self.conn.cursor() def create(self, table: str, fields: dict): ''' 建表(待完善) :param table: 表名 :param fields: 字段及类型 :return: 执行结果 ''' pass def insert(self, table: str, data: dict): ''' 插入语句 :param table: 表名 :param data: 字典(包含插入列,值) :return: 执行结果 ''' pass def select(self, table: str, cols='*', where=''): ''' 查询 :param table: 表名 :param cols: 查询列 :param where: 查询条件 :return: 查询结果 ''' pass def update(self, table: str, data: dict, where=''): ''' 更新表 :param table: 表名 :param data:更新数据 :param where:条件 :return: 执行结果 ''' pass def delete(self, table: str, where=''): ''' 删除数据 :param table: 表名 :param where: 条件 :return: 执行结果 ''' pass def drop(self, table: str): ''' 删除表 :param table: :return: ''' pass def close(self): self.cursor.close() self.conn.close()
|
2.1. 创建表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| def create(self, table: str, fields: dict): ''' 建表(待完善) :param table: 表名 :param fields: 字段及类型 :return: 执行结果 ''' flag = True try: data = fields.items() func = lambda x: ' '.join(x) field_info = ','.join(list(map(func, data))) sql_create = f"CREATE TABLE {table} ({field_info})" print(sql_create) self.cursor.execute(sql_create) self.conn.commit() except Exception as e: print(e) flag = False return flag
|
2.2. 插入语句(增加记录)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| def insert(self, table: str, data: dict): ''' 插入语句 :param table: 表名 :param data: 字典(包含插入列,值) :return: 执行结果 ''' flag = True count = 0 try: data = collections.OrderedDict(data) keys = tuple(data.keys()) vals = tuple(data.values()) sql_insert = '''INSERT INTO %s %s VALUES %s''' % (table, keys, vals) print(sql_insert) self.cursor.execute(sql_insert) count = self.cursor.rowcount self.conn.commit() except Exception as e: print(e) flag = False return flag, count
|
2.3. 查询操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| def select(self, table: str, cols='*', where=''): ''' 查询 :param table: 表名 :param cols: 查询列 :param where: 查询条件 :return: 查询结果 ''' values = None try: where = 'WHERE ' + where if where else '' sql_select = '''SELECT {1} FROM {0} {2}'''.format(table, cols, where) print(sql_select) self.cursor.execute(sql_select) values = self.cursor.fetchall() except Exception as e: print(e) return values
|
2.4. 更新操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| def update(self, table: str, data: dict, where=''): ''' 更新表 :param table: 表名 :param data:更新数据 :param where:条件 :return: 执行结果 ''' flag = True count = 0 try: data = data.items() func = lambda x: ' = '.join(x) field_info = ','.join(list(map(func, data))) where = ' WHERE ' + where if where else '' sql_update = '''UPDATE {0} SET {1} {2}'''.format(table, field_info, where) print(sql_update) self.cursor.execute(sql_update) count = self.cursor.rowcount self.conn.commit() except Exception as e: print(e) flag = False return flag, count
|
2.5. 删除操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| def delete(self, table: str, where=''): ''' 删除数据 :param table: 表名 :param where: 条件 :return: 执行结果 ''' flag = True count = 0 if not where: print('条件不能为空,请重新尝试!') return False, count try: sql_delete = '''DELETE FROM {0} WHERE {1}'''.format(table, where) self.cursor.execute(sql_delete) count = self.cursor.rowcount self.conn.commit() except Exception as e: print(e) flag = False return flag, count
|
2.6. 删除表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def drop(self, table: str): ''' 删除表 :param table: :return: ''' flag = True try: sql_drop = f'''DROP TABLE {table}''' self.cursor.execute(sql_drop) self.conn.commit() except Exception as e: print(e) flag = False return flag
|
3. 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| def code_test(): fields = { 'id ': 'INT PRIMARY KEY', 'name': 'VARCHAR(50) NOT NULL', 'phone': 'VARCHAR(20)', 'address': 'VARCHAR(50)', 'age': 'INT NOT NULL', 'position': 'VARCHAR(20)', 'salary': 'REAL', } insert_data = { 'id': '1', 'name': 'alfred', 'phone': '111111', 'age': '20', 'salary': '15000', } update_data = { 'name': '\'Lucy\'' } res4 = myApi.update('user', data=update_data, where='id=3') print('update result is :', res4)
myApi = SQLiteAPI()
if __name__ == '__main__': code_test()
|
结束,以上是对sqlite数据库的一些简单操作!
原文地址: CSDN博客 - Python(五)- 连接sqlite3以及CURD操作!