[ https://issues.apache.org/jira/browse/AIRFLOW-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tanay Tummalapalli reassigned AIRFLOW-2136: ------------------------------------------- Assignee: (was: Tanay Tummalapalli) > MySqlOperator does not comply well with sql scripts with multiple statements > with MySQL warnings > ------------------------------------------------------------------------------------------------ > > Key: AIRFLOW-2136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2136 > Project: Apache Airflow > Issue Type: Bug > Components: core > Affects Versions: 1.10.0 > Reporter: Fabrice Dossin > Priority: Major > > Hello, > Take a simple sql script that will produce warnings : > > {code:java} > CREATE TABLE IF NOT EXISTS test_table (a BIGINT NOT NULL PRIMARY KEY); > CREATE TABLE IF NOT EXISTS test_table2 (b BIGINT NOT NULL PRIMARY KEY); > CREATE TABLE IF NOT EXISTS test_table3 (b BIGINT NOT NULL PRIMARY KEY); > {code} > On the second execution with MySqlOperator, it fails because of the warnings. > > > {noformat} > Traceback (most recent call last): > File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 1514, > in _run_raw_task > result = task_copy.execute(context=context) > File > "/usr/local/lib/python3.6/dist-packages/airflow/operators/mysql_operator.py", > line 55, in execute > parameters=self.parameters) > File "/usr/local/lib/python3.6/dist-packages/airflow/hooks/dbapi_hook.py", > line 165, in run > cur.execute(s) > File "/usr/lib/python3.6/contextlib.py", line 185, in __exit__ > self.thing.close() > File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 84, > in close > while self.nextset(): > File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 177, > in nextset > self._warning_check() > File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 148, > in _warning_check > warnings = db.show_warnings() > File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line > 381, in show_warnings > self.query("SHOW WARNINGS") > File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line > 277, in query > _mysql.connection.query(self, query) > _mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't > run this command now"){noformat} > The truth is that mysqldb cursor is made for the execution of a list of > statements while the use of a SQL file as input is sent as a single string. I > guess warnings during the script are read by mysqldb and we cannot do some > fetchall between statements so it fails with the common out of sync. > > I fix this by splitting statement with the use of sqlparse library > ([https://github.com/andialbrecht/sqlparse)] in a extended class of > MySqlOperator : > {code:java} > from airflow.operators.mysql_operator import MySqlOperator > import sqlparse > class MySqlSplitOperator(MySqlOperator): > > def execute(self, context): > if isinstance(self.sql, str): > self.sql = [self.sql] > splited_list = [] > for s in self.sql: > splited_list.extend(sqlparse.split(s)) > self.sql = splited_list > super().execute(context) > {code} > I do not know if your are interested I bring it back to main code in a PR. > Just ask if your are interested and tell me where to integrate it. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)