Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

352人浏览   2023-10-23 14:34:23

自从学了Python和MySQL,再也不用为处理Exce报表而挠头了。下面请看笔者是如何处理 Excel报表的。

一、Excel 文件

F:/public/玉米报表.xls

内容如下图:成品入库表

成品出库表

二、需求

1、一键备份 Excel 数据到 MySQL 数据库

2、一键查询库存报表

三、使用展示

1、点击Run,一键运行

2、登录

选择第1项登录,没有用户名的,可以根据提示进行注册,具体注册方法,这里不再展示。

3、第1项查看数据库,第2项选择使用数据库,第3项创建数据库及表

4、选择使用的数据库,这里笔者选择 demdb。

5、选择第1项导入Excel 报表,可以重复导入,以更新数据表。

6、选择第5项,可以查询入库情况

7、选择第6项,可以查询出库情况

8、选择第7项,可以查询库存情况

四、代码

def import_data(user, password, db_name):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)

df1 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="大包装入库")

df2 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="小包装入库")

df3 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="半成品入库")

df4 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="成品入库")

df5 = pd.read_excel("F:/public/玉米报表.xls", sheet_name="成品出库")

print("read excel data successfully ! ")

# engine=create_engine("postgresql+psycopg2://wyj:wyj@127.0.0.1:5432/corn")

# engine = create_engine("mysql+mysqlconnector://wyj:wyj@127.0.0.1:3306/corn", echo=False)

# sql1 = "truncate table `大包装入库`;"

# sql2 = "truncate table `小包装入库`;"

# sql3 = "truncate table `半成品入库`;"

# sql4 = "truncate table `成品入库`;"

# sql5 = "truncate table `成品出库`;"

sql1 = "drop table if exists `big_bag_in`;"

sql2 = "drop table if exists `small_bag_in`;"

sql3 = "drop table if exists `semi_in`;"

sql4 = "drop table if exists `finished_in`;"

sql5 = "drop table if exists `finished_out`;"

with engine.connect() as conn:

conn.execute(sql1)

conn.execute(sql2)

conn.execute(sql3)

conn.execute(sql4)

conn.execute(sql5)

print("executed successfully !")

df1.to_sql(name="big_bag_in", con=engine, index=False, if_exists="replace")

df2.to_sql(name="small_bag_in", con=engine, index=False, if_exists="replace")

df3.to_sql(name="semi_in", con=engine, index=False, if_exists="replace")

df4.to_sql(name="finished_in", con=engine, index=False, if_exists="replace")

df5.to_sql(name="finished_out", con=engine, index=False, if_exists="replace")

print("imported successfully !")

except Exception as e:

print(e)

def query_data(user, password, db_name, tb_name):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)

sql = f"select * from {tb_name} limit 5;"

df = pd.read_sql(sql, engine)

print("""

查询结果

""")

print("-------------------------------------------------")

print(df)

print("-------------------------------------------------")

except Exception as e:

print(e)

def delete_data(user, password, db_name, tb_name, tb_id, product_name):

from sqlalchemy import create_engine

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)

with engine.connect() as conn:

conn.execute(f"use {db_name}")

print(f"{db_name} selected !")

sql1 = "show tables;"

result = conn.execute(sql1).fetchall()

print(result)

sql2 = f"delete from {tb_name} where sht_id = %s and product=%s"

paras = (tb_id, product_name)

conn.execute(sql2, paras)

print("delete successfully !")

except Exception as e:

print(e)

def update(user, password, db_name, tb_name, tb_id, product_name, new_qty1, new_qty2):

from sqlalchemy import create_engine

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)

with engine.connect() as conn:

conn.execute(f"use {db_name}")

print(f"{db_name} selected !")

sql1 = "show tables;"

result = conn.execute(sql1).fetchall()

print(result)

sql2 = f"update {tb_name} set 发货=%s,实销=%s where sht_id = %s and product=%s"

paras = (new_qty1, new_qty2, tb_id, product_name)

conn.execute(sql2, paras)

print("updated successfully !")

except Exception as e:

print(e)

def stockIn_list(user, password, db_name, tb_in):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)

sql = f"select product,small_bag,小袋数量 as 入库 from {tb_in} where product is not null;"

df = pd.read_sql(sql, engine)

print("""

入库明细

""")

print("--------------------------------------------")

print(df)

print("--------------------------------------------")

except Exception as e:

print(e)

def stockOut_list(user, password, db_name, tb_out):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)

sql = f"select customer,product,包装类型,发货 as 出库 from {tb_out};"

df = pd.read_sql(sql, engine)

print("""

出库明细

""")

print("--------------------------------------------")

print(df)

print("--------------------------------------------")

except Exception as e:

print(e)

def inventory(user, password, db_name, tb_in, tb_out):

from sqlalchemy import create_engine

# import pandas as pd

pd.set_option('display.unicode.ambiguous_as_wide', True)

pd.set_option('display.unicode.east_asian_width', True)

pd.set_option('expand_frame_repr', False)

pd.set_option('display.max_columns', None, 'display.max_rows', None)

pd.set_option('display.max_rows', 5000)

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}", echo=False)

sql1 = "select product,description,小袋数量 as 入库 from %s where product is not null;" % tb_in

sql2 = "select product,description,发货 as 出库,其他出库 from %s;" % tb_out

df1 = pd.read_sql(sql1, engine)

df2 = pd.read_sql(sql2, engine)

df1.fillna(0, inplace=True)

df2.fillna(0, inplace=True)

print("""

入库明细

""")

print("--------------------------------------------")

print(df1)

print("--------------------------------------------")

print("""

出库明细

""")

print("--------------------------------------------")

print(df2)

print("--------------------------------------------")

df1 = df1.groupby(['product', 'description'])[['入库']].sum()

df2 = df2.groupby(['product', 'description'])[['出库', '其他出库']].sum()

df = df1.merge(df2, how='left', left_on=['product', 'description'], right_on=['product', 'description'])

df.fillna(0, inplace=True)

df['库存'] = df['入库'] - df['出库'] - df['其他出库']

df = pd.pivot_table(df, index=['product', 'description'], values=['入库', '出库', '其他出库', '库存'], aggfunc=sum,

margins=True)

print("""

库存汇总

""")

print("------------------------------------------------------------")

print(df)

print("------------------------------------------------------------")

print('Finished !')

except Exception as e:

print(e)

def show_databases(user, password):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)

sql = "show databases;"

df = pd.read_sql(sql, engine)

print(df)

except Exception as e:

print(e)

def use_database(user, password, db_name):

from sqlalchemy import create_engine

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)

with engine.connect() as conn:

conn.execute(f"use {db_name}")

print(f"{db_name} selected !")

sql = "show tables;"

result = conn.execute(sql).fetchall()

print("table list:")

print("-------------")

for re in result:

print(re[0])

print("-------------")

except Exception as e:

print(e)

def create_db(user, password, db_name, tb1_name, tb2_name):

from sqlalchemy import create_engine

try:

engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306", echo=False)

sql1 = f"create database if not exists {db_name};"

with engine.connect() as conn:

conn.execute(sql1)

conn.execute("commit;")

print(f"{db_name} created successfully !")

with engine.connect() as conn:

conn.execute(f"use {db_name}")

conn.execute("commit;")

sql2 = f"""create table if not exists {tb1_name}(

product_id int auto_increment primary key,

product_name varchar(25) not null ,

description varchar(50),

price decimal(4,2),

qty int not null default 0,

ctime datetime default current_timestamp on update current_timestamp,

utime datetime default current_timestamp on update current_timestamp

) engine=InnoDB default charset=utf8;

"""

sql3 = f"""create table if not exists {tb2_name}(

product_id int auto_increment primary key,

product_name varchar(25) not null ,

description varchar(50),

price decimal(4,2),

qty int not null default 0,

ctime datetime default current_timestamp on update current_timestamp,

utime datetime default current_timestamp on update current_timestamp

) engine=InnoDB default charset=utf8;

"""

with engine.connect() as conn:

conn.execute(sql2)

conn.execute("commit;")

print(f"{tb1_name} created successfully !")

conn.execute(sql3)

print(f"{tb2_name} created successfully !")

except Exception as e:

print(e)

def main():

# import pandas as pd

import login_func as login

import getpass

pd.set_option('display.unicode.ambiguous_as_wide', True)

pd.set_option('display.unicode.east_asian_width', True)

pd.set_option('expand_frame_repr', False)

pd.set_option('display.max_columns', None, 'display.max_rows', None)

pd.set_option('display.max_rows', 5000)

msg1 = """

登录界面

--------------------------------------------

0. exit/quit

1. login

2. register

3. insert employee info

--------------------------------------------

"""

msg2 = """

欢迎进入商品进销存管理系统 2.0 版

------------------------------------------------

0. quit

1. show databases

2. use database

3. create new database and new table

-------------------------------------------------

"""

while True:

print(msg1)

choice = input('please input your choice: ')

if choice == 'q' or choice == 'Q' or choice == '0':

print('quit !')

break

elif choice == '1':

user = input("please input user name: ")

password = getpass.getpass("enter password:")

# password = input("please input password: ")

is_login = login.login(user, password)

if not is_login:

print("用户名或密码错误!")

else:

print(f"{user} login successfully!")

while True:

print(msg2)

handle = input('请选择操作项目: ')

if handle == 'q' or handle == 'Q' or handle == '0':

print('quit !')

break

elif handle == '1':

user = 'wyj'

password = 'wyj'

show_databases(user, password)

elif handle == '2':

user = 'wyj'

password = 'wyj'

db_name = input("please select database name: ")

use_database(user, password, db_name)

msg3 = """

商品管理系统 2.0 版

--------------------------------

0. exit

1. 导入 Excel 数据表

2. 查询

3. 删除数据

4. 修改数据

5. 入库查询

6. 出库查询

7. 库存查询

--------------------------------

"""

while True:

print(msg3)

num = input('请选择操作项目编号: ')

if num == 'q' or num == 'Q' or num == '0':

print('退出系统 !')

break

elif num == '1':

import_data(user, password, db_name)

elif num == '2':

print("----------query table >tb_name = input("table name: ")

query_data(user, password, db_name, tb_name)

elif num == '3':

print("----------delete table >tb_name = input("table name: ")

tb_id = input("table id: ")

product_name = input("input product name: ")

delete_data(user, password, db_name, tb_name, tb_id, product_name)

elif num == '4':

print("----------update table >tb_name = input("please input table name: ")

tb_id = input("please input table id: ")

product_name = input("please input product name: ")

new_qty1 = input("please input new qty1: ")

new_qty2 = input("please input new qty2: ")

update(user, password, db_name, tb_name, tb_id, product_name, new_qty1, new_qty2)

elif num == '5':

tb_in = "finished_in"

stockIn_list(user, password, db_name, tb_in)

elif num == '6':

tb_out = "finished_out"

stockOut_list(user, password, db_name, tb_out)

elif num == '7':

tb_in = "finished_in"

tb_out = "finished_out"

inventory(user, password, db_name, tb_in, tb_out)

else:

print('input error!,please input again !')

elif handle == '3':

user = 'wyj'

password = 'wyj'

db_name = input("please input new database name: ")

tb1_name = input("please input new table1 name: ")

tb2_name = input("please input new table2 name: ")

create_db(user, password, db_name, tb1_name, tb2_name)

break

else:

print('Error! please input again !')

elif choice == '2':

register_name = input("please input register name: ")

employee_id = input("please input employee id : ")

is_user_exists = login.user_exists(register_name)

is_employee_exists = login.employee_exists(employee_id)

if is_user_exists:

print("user exists ,can not register,please change another user name.")

elif not is_employee_exists:

print('not employee , can not register.')

else:

pwd1 = input('please input your password: ')

pwd2 = input('please confirm your password: ')

if pwd1 == pwd2:

login.register(register_name, pwd1)

print("ok,registered successfully!")

elif choice == '3':

employee_id = input("please input employee id: ")

employee_name = input("please input employee name: ")

login.insert_employee_info(employee_name, employee_id)

else:

print('Error! please input again !')

if __name__ == '__main__':

import pandas as pd

pd.set_option('display.unicode.ambiguous_as_wide', True)

pd.set_option('display.unicode.east_asian_width', True)

pd.set_option('expand_frame_repr', False)

pd.set_option('display.max_columns', None, 'display.max_rows', None)

pd.set_option('display.max_rows', 5000)

main()

好了,还不赶快去试一下。

相关推荐