[ https://issues.apache.org/jira/browse/NIFI-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198908#comment-17198908 ]
Roberto Garcia edited comment on NIFI-7820 at 9/20/20, 6:06 AM: ---------------------------------------------------------------- tested in Python 3.8.5, nifi-1.12.0, MySQL 8.0.21 !image-2020-09-20-01-44-32-117.png|width=911,height=360! !image-2020-09-20-01-58-03-615.png|width=773,height=545! !image-2020-09-20-01-45-19-739.png|width=722,height=505! h3. */home/sample/tmp/sample.py* {code:java} #!/usr/bin/env python3 import re import sys import string import mysql.connector from mysql.connector import errorcode config = { 'user':'sample', 'password':'myStrongPassword', 'host':'127.0.0.1', 'database':'sample' } try: cnx = mysql.connector.connect(**config) except mysql.connector.Erro as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) cursor = cnx.cursor() query = ( "SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\\'','') enums " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE table_name='sample_with_enum' AND column_name='gender'") cursor.execute(query) row = str(cursor.fetchone()[0]) cursor.close() cnx.close() flowfile=sys.stdin.readline() match = re.search('(?<={"id":2,"name":"gender","column_type":1,"value":)(.*?)(?=})', flowfile, re.DOTALL) if match: index = int(match.group(1)) -1 enums = row.split(',') newFlowfile = re.sub('(?<={"id":2,"name":"gender","column_type":1,"value":)(.*?)(?=})', f'"{enums[index]}"', flowfile, 0, re.DOTALL) print(newFlowfile) {code} was (Author: robertogarcia): tested in Python 3.8.5, nifi-1.12.0, MySQL 8.0.21 !image-2020-09-20-01-44-32-117.png|width=1088,height=430! !image-2020-09-20-01-58-03-615.png|width=696,height=491! !image-2020-09-20-01-45-19-739.png|width=646,height=452! h3. */home/sample/tmp/sample.py* {code:java} #!/usr/bin/env python3 import re import sys import string import mysql.connector from mysql.connector import errorcode config = { 'user':'sample', 'password':'myStrongPassword', 'host':'127.0.0.1', 'database':'sample' } try: cnx = mysql.connector.connect(**config) except mysql.connector.Erro as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) cursor = cnx.cursor() query = ( "SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\\'','') enums " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE table_name='sample_with_enum' AND column_name='gender'") cursor.execute(query) row = str(cursor.fetchone()[0]) cursor.close() cnx.close() flowfile=sys.stdin.readline() match = re.search('(?<={"id":2,"name":"gender","column_type":1,"value":)(.*?)(?=})', flowfile, re.DOTALL) if match: index = int(match.group(1)) -1 enums = row.split(',') newFlowfile = re.sub('(?<={"id":2,"name":"gender","column_type":1,"value":)(.*?)(?=})', f'"{enums[index]}"', flowfile, 0, re.DOTALL) print(newFlowfile) {code} > How to connect to controller service DBCP connection pool and execute the sql > using that connection via python > --------------------------------------------------------------------------------------------------------------- > > Key: NIFI-7820 > URL: https://issues.apache.org/jira/browse/NIFI-7820 > Project: Apache NiFi > Issue Type: Task > Components: NiFi Stateless > Reporter: zeyk > Priority: Major > Attachments: enumsWithPython.xml, execute_script.py, > image-2020-09-20-01-44-32-117.png, image-2020-09-20-01-45-19-739.png, > image-2020-09-20-01-58-03-615.png > > > I have a python code updated with my own logic to replace the index of enums > with values, but in order to do so , i need to execute certain sql commands > to get some values and the best thing would be to get the dbcp connection > pool connection from nifi controller services and execute the commands , but > i dont know to how to implement that in python, since few modules such as > pymysql doesnt support in Nifi(Jython) ,Please find the code below: > Any kind of help would be highly appreciated. Thanks in advance > > {color:#89ddff}import{color}{color:#eeffff} json{color} > {color:#89ddff}import{color}{color:#eeffff} re{color} > {color:#89ddff}import{color}{color:#eeffff} sys{color} > {color:#89ddff}import{color}{color:#eeffff} traceback{color} > {color:#89ddff}from{color}{color:#eeffff} java.nio.charset > {color}{color:#89ddff}import{color}{color:#eeffff} StandardCharsets{color} > {color:#89ddff}from{color}{color:#eeffff} org.apache.commons.io > {color}{color:#89ddff}import{color}{color:#eeffff} IOUtils{color} > {color:#89ddff}from{color}{color:#eeffff} org.apache.nifi.processor.io > {color}{color:#89ddff}import{color}{color:#eeffff} StreamCallback{color} > {color:#89ddff}from{color}{color:#eeffff} org.python.core.util > {color}{color:#89ddff}import{color}{color:#eeffff} StringUtil{color} > {color:#c792ea}class{color}{color:#eeffff} > {color}{color:#ffcb6b}TransformCallback{color}{color:#89ddff}({color}{color:#c3e88d}StreamCallback{color}{color:#89ddff}):{color} > {color:#eeffff} {color}{color:#c792ea}def{color}{color:#eeffff} > {color}{color:#82aaff}__init__{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff}):{color} > {color:#eeffff} {color}{color:#89ddff}pass{color} > {color:#eeffff} {color}{color:#c792ea}def{color}{color:#eeffff} > {color}{color:#82aaff}process{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff},{color}{color:#eeffff} > > {color}{color:#ff5370}inputStream{color}{color:#89ddff},{color}{color:#eeffff} > {color}{color:#ff5370}outputStream{color}{color:#89ddff}):{color} > {color:#eeffff} {color}{color:#89ddff}try{color}{color:#eeffff}:{color} > {color:#89ddff} {color}{color:#546e7a}# Read input FlowFile > content{color} > {color:#eeffff} input_text > {color}{color:#c792ea}={color}{color:#eeffff} > IOUtils.toString{color}{color:#89ddff}({color}{color:#eeffff}inputStream, > StandardCharsets.UTF_8{color}{color:#89ddff}){color} > {color:#eeffff} input_obj > {color}{color:#c792ea}={color}{color:#eeffff} > json.loads{color}{color:#89ddff}({color}{color:#eeffff}input_text{color}{color:#89ddff}){color} > {color:#eeffff} {color} > {color:#eeffff} {color} > {color:#eeffff} {color}{color:#eeffff}table_name > {color}{color:#c792ea}={color}{color:#eeffff} > input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}table_name{color}{color:#89ddff}'{color}{color:#89ddff}]{color} > {color:#eeffff} column_name > {color}{color:#c792ea}={color}{color:#eeffff} > {color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d} > column_name {color}{color:#f78c6c}FROM{color}{color:#c3e88d} > INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d} > table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"' > {color}{color:#f78c6c}AND{color}{color:#c3e88d} > data_type{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}enum'{color}{color:#89ddff}"{color} > {color:#eeffff} enum_value_sql > {color}{color:#c792ea}={color}{color:#eeffff} > {color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d} > {color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}(column_type,{color}{color:#89ddff}'{color}{color:#c3e88d}enum',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d})',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}(',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}\'',{color}{color:#89ddff}'{color}{color:#c3e88d}') > enums {color}{color:#f78c6c}FROM{color}{color:#c3e88d} > INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d} > table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"' > {color}{color:#f78c6c}AND{color}{color:#c3e88d} > column_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+column_name+"'{color}{color:#89ddff}"{color} > {color:#eeffff} enum_value > {color}{color:#c792ea}={color}{color:#eeffff} > enum_value_sql.split{color}{color:#89ddff}({color}{color:#89ddff}'{color}{color:#c3e88d},{color}{color:#89ddff}'{color}{color:#89ddff}){color} > {color:#eeffff} {color}{color:#89ddff}for{color}{color:#eeffff} > col {color}{color:#c792ea}in{color}{color:#eeffff} > input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}columns{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}:{color} > {color:#eeffff} {color}{color:#89ddff}if{color}{color:#eeffff} > col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}name{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff} > {color}{color:#c792ea}=={color}{color:#eeffff} > {color}{color:#b2ccd6}str{color}{color:#89ddff}({color}{color:#eeffff}column_name{color}{color:#89ddff}){color}{color:#eeffff}:{color} > {color:#eeffff} > col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff} > {color}{color:#c792ea}={color}{color:#eeffff} > enum_value{color}{color:#89ddff}[{color}{color:#b2ccd6}int{color}{color:#89ddff}({color}{color:#eeffff}col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]){color}{color:#eeffff} > {color}{color:#c792ea}-{color}{color:#eeffff} > {color}{color:#f78c6c}1{color}{color:#89ddff}]{color} > {color:#eeffff} output_text > {color}{color:#c792ea}={color}{color:#eeffff} > json.dumps{color}{color:#89ddff}({color}{color:#eeffff}input_obj{color}{color:#89ddff}){color} > {color:#eeffff} > outputStream.write{color}{color:#89ddff}({color}{color:#eeffff}StringUtil.toBytes{color}{color:#89ddff}({color}{color:#eeffff}output_text{color}{color:#89ddff})){color} > {color:#eeffff} > {color}{color:#89ddff}except{color}{color:#eeffff}:{color} > {color:#eeffff} > traceback.print_exc{color}{color:#89ddff}({color}{color:#ff5370}file{color}{color:#c792ea}={color}{color:#eeffff}sys.stdout{color}{color:#89ddff}){color} > {color:#eeffff} {color}{color:#89ddff}raise{color} > {color:#eeffff}flowFile {color}{color:#c792ea}={color}{color:#eeffff} > session.get{color}{color:#89ddff}(){color} > {color:#89ddff}if{color}{color:#eeffff} flowFile > {color}{color:#c792ea}!={color}{color:#eeffff} > {color}{color:#f78c6c}None{color}{color:#eeffff}:{color} > {color:#eeffff} flowFile {color}{color:#c792ea}={color}{color:#eeffff} > session.write{color}{color:#89ddff}({color}{color:#eeffff}flowFile, > TransformCallback{color}{color:#89ddff}()){color} > {color:#89ddff} {color}{color:#546e7a}# Finish by transferring the > FlowFile to an output relationship{color} > {color:#eeffff}session.transfer{color}{color:#89ddff}({color}{color:#eeffff}flowFile, > REL_SUCCESS{color}{color:#89ddff}){color} -- This message was sent by Atlassian Jira (v8.3.4#803005)