1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
|
# -*- coding:utf-8 -*-
import logging
from mysql.connector.pooling import CNX_POOL_MAXSIZE
from mysql.connector.pooling import MySQLConnectionPool, PooledMySQLConnection
from mysql.connector import errors
import threading
CONNECTION_POOL_LOCK = threading.RLock()
class Pool(MySQLConnectionPool):
def connect(self):
try:
return self.get_connection()
except errors.PoolError:
# Pool size should be lower or equal to CNX_POOL_MAXSIZE
if self.pool_size < CNX_POOL_MAXSIZE:
with threading.Lock():
new_pool_size = self.pool_size + 1
try:
self._set_pool_size(new_pool_size)
self._cnx_queue.maxsize = new_pool_size
self.add_connection()
except Exception as e:
logging.exception(e)
return self.connect()
else:
with CONNECTION_POOL_LOCK:
cnx = self._cnx_queue.get(block=True)
if not cnx.is_connected() or self._config_version != cnx._pool_config_version:
cnx.config(**self._cnx_config)
try:
cnx.reconnect()
except errors.InterfaceError:
# Failed to reconnect, give connection back to pool
self._queue_connection(cnx)
raise
cnx._pool_config_version = self._config_version
return PooledMySQLConnection(self, cnx)
except Exception:
raise
def query(self, operation, params=None):
cnx = cursor = None
try:
cnx = self.connect()
cursor = cnx.cursor(buffered=True, dictionary=True)
cursor.execute(operation, params)
data_set = cursor.fetchall()
except Exception:
raise
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
return data_set
def get(self, operation, params=None):
cnx = cursor = None
try:
cnx = self.connect()
cursor = cnx.cursor(buffered=True, dictionary=True)
cursor.execute(operation, params)
data_set = cursor.fetchone()
except Exception:
raise
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
return data_set
def insert(self, operation, params=None):
cnx = cursor = None
try:
cnx = self.connect()
cursor = cnx.cursor()
cursor.execute(operation, params)
last_id = cursor.lastrowid
except Exception:
raise
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
return last_id
def insert_many(self, operation, seq_params):
cnx = cursor = None
try:
cnx = self.connect()
cursor = cnx.cursor()
cursor.executemany(operation, seq_params)
row_count = cursor.rowcount
except Exception:
raise
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
return row_count
def execute(self, operation, params=None):
cnx = cursor = None
try:
cnx = self.connect()
cursor = cnx.cursor()
cursor.execute(operation, params)
row_count = cursor.rowcount
except Exception:
raise
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
return row_count
def update(self, operation, params=None):
return self.execute(operation, params)
def delete(self, operation, params=None):
return self.execute(operation, params)
def begin(self, consistent_snapshot=False, isolation_level=None, readonly=None):
cnx = self.connect()
cnx.start_transaction(consistent_snapshot, isolation_level, readonly)
return Transaction(cnx)
class Transaction(object):
def __init__(self, connection):
self.cnx = None
if isinstance(connection, PooledMySQLConnection):
self.cnx = connection
self.cursor = connection.cursor(buffered=True, dictionary=True)
else:
raise AttributeError("connection should be a PooledMySQLConnection")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None and exc_val is None and exc_tb is None:
self.commit()
else:
# will raise with-body's Exception, should deal with it
self.rollback()
self.close()
def query(self, operation, params=None):
cursor = self.cursor
cursor.execute(operation, params)
data_set = cursor.fetchall()
return data_set
def get(self, operation, params=None):
cursor = self.cursor
cursor.execute(operation, params)
data_set = cursor.fetchone()
return data_set
def insert(self, operation, params=None):
cursor = self.cursor
cursor.execute(operation, params)
last_id = cursor.lastrowid
return last_id
def insert_many(self, operation, seq_params):
cursor = self.cursor
cursor.executemany(operation, seq_params)
row_count = cursor.rowcount
return row_count
def execute(self, operation, params=None):
cursor = self.cursor
cursor.execute(operation, params)
row_count = cursor.rowcount
return row_count
def update(self, operation, params=None):
return self.execute(operation, params)
def delete(self, operation, params=None):
return self.execute(operation, params)
def commit(self):
self.cnx.commit()
def rollback(self):
self.cnx.rollback()
def close(self):
self.cursor.close()
self.cnx.close()
|