# -*- coding: utf-8 -*-
"""
Copyright (c) 2024, Nimblex Co .,Ltd.
Created on 2024-05-06 15:42
备注:
1、python 3.8
2、在虚拟环境中使用pip安装psycopg2(macos用户请安装psycopg2-binary)、SQLAlchemy
"""
import threading
import sys
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, text
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError, SQLAlchemyError
Base = declarative_base()
class Counter(Base):
__tablename__ = 'counters'
id = Column(Integer, primary_key=True)
counter = Column(Integer)
engine = None
def init_db_engine(host, port, dbname, user, password):
global engine
uri = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
engine = create_engine(uri, echo=False,
pool_size=100, pool_recycle=3600,
pool_pre_ping=True)
def get_session():
global engine
SessionCls = sessionmaker(bind=engine)
return SessionCls()
def test_transaction_try_again():
def retryer():
while 1:
try:
session = get_session()
cnt = session.query(Counter).first()
cnt.counter = cnt.counter + 1
session.add(cnt)
session.commit()
except OperationalError as e:
message = repr(e)
if ('Try again' in message or '40001' in message or 'Restart read required' in message):
print('try again')
continue
except SQLAlchemyError as e:
message = repr(e)
print("An error occurred: ", message)
session.rollback()
raise e
finally:
session.close()
return
threads = []
for x in range(5):
t = threading.Thread(target=retryer)
t.start()
threads.append(t)
for t in threads:
t.join()
def main():
host = ""
port = "5433"
dbname = ""
user = ""
password = ""
init_db_engine(host, port, dbname, user, password)
Base.metadata.create_all(engine)
session = get_session()
try:
# 删除并新增数据
session.execute(text('delete from counters'))
cnt = Counter(counter=999)
session.add(cnt)
session.commit()
# 查询数据
# res = session.execute(text('select * from counters;'))
# rows = res.fetchall()
# for row in rows:
# print(row)
except SQLAlchemyError as e:
print("An error occurred: ", e)
session.rollback()
finally:
session.close()
test_transaction_try_again()
if __name__ == '__main__':
main()