Also I tried to comment call to insert_minute_data and print len(data).
This works fine. With DB call it either throws Mysql server has gone away
error or hangs/freezes in between without any error
On Tuesday, August 5, 2014 6:34:06 AM UTC-5, Milind Vaidya wrote:
>
> hi
>
> I have a generic static save method which saves instances of all objects
> as follows
>
> engine =
> create_engine("mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>")
>
> class DBHandler(object):
> '''Handles generic DB related operations'''
>
> session = Session(bind=engine)
>
> @staticmethod
> @db_error_handler
> def save(rows):
> if rows is not None:
> try:
> DBHandler.session.add_all(rows)
> DBHandler.session.commit()
> except IntegrityError as ie:
> logger.debug("Error saving data, integrity constraint
> violation:\
> %s", ie)
> raise DBIntegrityError("Error saving data, integrity \
> constraint violation: %s" % ie)
>
> @staticmethod
> @db_error_handler
> def fetch_host(hostname):
> host = DBHandler.session.query(Host).filter_by(host_name=\
> hostname).one()
> if host is None:
> host = Host(host_name=data['host'], host_colo='unknown')
> return host
>
>
> @staticmethod
> @db_error_handler
> def insert_minute_data(minute_data):
> minute_data_chunk = []
> for data in minute_data:
> host = DBHandler.fetch_emitter_host(data['host'])
> track = DBHandler.fetch_track(data['track'])
> timestamp = DBHandler.get_UTC_date(data['time'])
> minute_data_chunk.append(PerMinuteTraffic(minute_track=track,\
> minute_host=host, minute=timestamp,\
> events_emitted=data['in'], events_received=data['out']))
> DBHandler.save(minute_data_chunk)
>
>
>
> *Call from main:*
>
> with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
> future_to_track = {executor.submit(compute, client,
> start_epoch_min,\
> end_epoch_min, track): track for track in tracks}
> for future in concurrent.futures.as_completed(future_to_track):
> track = future_to_track[future]
> # try:
> data = future.result() *# Tried printing data and looks good
> most of the time*
> logger.debug("Fetched the per host minute level data for the \
> track:%s" % track)
> DBHandler.insert_minute_data(data)
>
>
> 1. Is it good way to create static methods like this?
> 2. is the approach of creating new if not present ok here ?
> 3. After printing data for couple of tracks in "insert_minute_data" it
> throws following error consistently
>
> Invalid Operation: (OperationalError) (2006, 'MySQL server has gone away')
> 'SELECT emitter_host.host_id AS emitter_host_host_id,
> emitter_host.host_name AS emitter_host_host_name, emitter_ho
> st.host_colo AS emitter_host_host_colo, emitter_host.property AS
> emitter_host_property \nFROM emitter_host \nWHERE emitter_host.host_name =
> %s' (u'somehostname',)
>
> SQLAlchemy error: Can't reconnect until invalid transaction is rolled back
> (original cause: InvalidRequestError: Can't reconnect until invalid
> transaction is rolled back) 'SELECT emitter_ho
> st.host_id AS emitter_host_host_id, emitter_host.host_name AS
> emitter_host_host_name, emitter_host.host_colo AS emitter_host_host_colo,
> emitter_host.property AS emitter_host_property \nFROM
> emitter_host \nWHERE emitter_host.host_name = %s' [immutabledict({})]
>
> Is it required to handle exception after any sqlalchemy operation and
> rollback() in except ?
>
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.