[ 
https://issues.apache.org/jira/browse/AIRFLOW-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanay Tummalapalli reassigned AIRFLOW-2136:
-------------------------------------------

    Assignee:     (was: Tanay Tummalapalli)

> MySqlOperator does not comply well with sql scripts with multiple statements 
> with MySQL warnings
> ------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2136
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2136
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.10.0
>            Reporter: Fabrice Dossin
>            Priority: Major
>
> Hello,
> Take a simple sql script that will produce warnings :
>  
> {code:java}
> CREATE TABLE IF NOT EXISTS test_table  (a BIGINT NOT NULL PRIMARY KEY);
> CREATE TABLE IF NOT EXISTS test_table2 (b BIGINT NOT NULL PRIMARY KEY);
> CREATE TABLE IF NOT EXISTS test_table3 (b BIGINT NOT NULL PRIMARY KEY);
> {code}
> On the second execution with MySqlOperator, it fails because of the warnings.
>  
>  
> {noformat}
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/dist-packages/airflow/models.py", line 1514, 
> in _run_raw_task
>     result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python3.6/dist-packages/airflow/operators/mysql_operator.py", 
> line 55, in execute
>     parameters=self.parameters)
>   File "/usr/local/lib/python3.6/dist-packages/airflow/hooks/dbapi_hook.py", 
> line 165, in run
>     cur.execute(s)
>   File "/usr/lib/python3.6/contextlib.py", line 185, in __exit__
>     self.thing.close()
>   File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 84, 
> in close
>     while self.nextset():
>   File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 177, 
> in nextset
>     self._warning_check()
>   File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 148, 
> in _warning_check
>     warnings = db.show_warnings()
>   File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 
> 381, in show_warnings
>     self.query("SHOW WARNINGS")
>   File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 
> 277, in query
>     _mysql.connection.query(self, query)
> _mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't 
> run this command now"){noformat}
> The truth is that mysqldb cursor is made for the execution of a list of 
> statements while the use of a SQL file as input is sent as a single string. I 
> guess warnings during the script are read by mysqldb and we cannot do some 
> fetchall between statements so it fails with the common out of sync.
>  
> I fix this by splitting statement with the use of sqlparse library 
> ([https://github.com/andialbrecht/sqlparse)] in a extended class of 
> MySqlOperator :
> {code:java}
> from airflow.operators.mysql_operator import MySqlOperator
> import sqlparse
> class MySqlSplitOperator(MySqlOperator):
>     
>     def execute(self, context):
>         if isinstance(self.sql, str):
>             self.sql = [self.sql]
>         splited_list = []
>         for s in self.sql:
>             splited_list.extend(sqlparse.split(s))
>         self.sql = splited_list
>         super().execute(context)
> {code}
> I do not know if your are interested I bring it back to main code in a PR.
> Just ask if your are interested and tell me where to integrate it.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to