[ https://issues.apache.org/jira/browse/NIFI-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198973#comment-17198973 ]
zeyk commented on NIFI-7820: ---------------------------- Hi [~RobertoGarcia] , I have tried the above method with my own logic in it using ExecuteStreamCommand processor. The script throws up error stating "No module named mysql" error, but the module has been installed in local computed where nifi is installed, moreover the same script runs good from the terminal with hard coding the flowfile values. Please find the screenshots below !image-2020-09-20-17-36-02-743.png! !image-2020-09-20-17-37-49-409.png! !image-2020-09-20-17-37-12-487.png! Updated script: #!/usr/bin/env python3 import re import sys import string import mysql.connector from mysql.connector import errorcode config = { 'user':'data', 'password':'pass', 'host':'host', 'database':'ml' } try: cnx = mysql.connector.connect(**config) except mysql.connector.Erro as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) cursor = cnx.cursor() flowfile=sys.stdin.readline() table_name = flowfile['table_name'] col_sql = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name='"+table_name+"' AND data_type='enum'" cursor.execute(col_sql) column_name = str(cursor.fetchone()[0]) col_query = ( "SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\\'','') enums " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE table_name='"+table_name+"' AND column_name='"+column_name+"'") cursor.execute(col_query) enum_row = str(cursor.fetchone()[0]) cursor.close() cnx.close() enum_val = enum_row.split(',') for col in flowfile['columns']: if col['name'] == str(column_name): col['value'] = enum_val[int(col['value']) - 1] print(flowfile) > How to connect to controller service DBCP connection pool and execute the sql > using that connection via python > --------------------------------------------------------------------------------------------------------------- > > Key: NIFI-7820 > URL: https://issues.apache.org/jira/browse/NIFI-7820 > Project: Apache NiFi > Issue Type: Task > Components: NiFi Stateless > Reporter: zeyk > Priority: Major > Attachments: enumsWithPython.xml, execute_script.py, > image-2020-09-20-01-44-32-117.png, image-2020-09-20-01-45-19-739.png, > image-2020-09-20-01-58-03-615.png, image-2020-09-20-17-36-02-743.png, > image-2020-09-20-17-37-12-487.png, image-2020-09-20-17-37-49-409.png > > > I have a python code updated with my own logic to replace the index of enums > with values, but in order to do so , i need to execute certain sql commands > to get some values and the best thing would be to get the dbcp connection > pool connection from nifi controller services and execute the commands , but > i dont know to how to implement that in python, since few modules such as > pymysql doesnt support in Nifi(Jython) ,Please find the code below: > Any kind of help would be highly appreciated. Thanks in advance > > {color:#89ddff}import{color}{color:#eeffff} json{color} > {color:#89ddff}import{color}{color:#eeffff} re{color} > {color:#89ddff}import{color}{color:#eeffff} sys{color} > {color:#89ddff}import{color}{color:#eeffff} traceback{color} > {color:#89ddff}from{color}{color:#eeffff} java.nio.charset > {color}{color:#89ddff}import{color}{color:#eeffff} StandardCharsets{color} > {color:#89ddff}from{color}{color:#eeffff} org.apache.commons.io > {color}{color:#89ddff}import{color}{color:#eeffff} IOUtils{color} > {color:#89ddff}from{color}{color:#eeffff} org.apache.nifi.processor.io > {color}{color:#89ddff}import{color}{color:#eeffff} StreamCallback{color} > {color:#89ddff}from{color}{color:#eeffff} org.python.core.util > {color}{color:#89ddff}import{color}{color:#eeffff} StringUtil{color} > {color:#c792ea}class{color}{color:#eeffff} > {color}{color:#ffcb6b}TransformCallback{color}{color:#89ddff}({color}{color:#c3e88d}StreamCallback{color}{color:#89ddff}):{color} > {color:#eeffff} {color}{color:#c792ea}def{color}{color:#eeffff} > {color}{color:#82aaff}__init__{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff}):{color} > {color:#eeffff} {color}{color:#89ddff}pass{color} > {color:#eeffff} {color}{color:#c792ea}def{color}{color:#eeffff} > {color}{color:#82aaff}process{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff},{color}{color:#eeffff} > > {color}{color:#ff5370}inputStream{color}{color:#89ddff},{color}{color:#eeffff} > {color}{color:#ff5370}outputStream{color}{color:#89ddff}):{color} > {color:#eeffff} {color}{color:#89ddff}try{color}{color:#eeffff}:{color} > {color:#89ddff} {color}{color:#546e7a}# Read input FlowFile > content{color} > {color:#eeffff} input_text > {color}{color:#c792ea}={color}{color:#eeffff} > IOUtils.toString{color}{color:#89ddff}({color}{color:#eeffff}inputStream, > StandardCharsets.UTF_8{color}{color:#89ddff}){color} > {color:#eeffff} input_obj > {color}{color:#c792ea}={color}{color:#eeffff} > json.loads{color}{color:#89ddff}({color}{color:#eeffff}input_text{color}{color:#89ddff}){color} > {color:#eeffff} {color} > {color:#eeffff} {color} > {color:#eeffff} {color}{color:#eeffff}table_name > {color}{color:#c792ea}={color}{color:#eeffff} > input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}table_name{color}{color:#89ddff}'{color}{color:#89ddff}]{color} > {color:#eeffff} column_name > {color}{color:#c792ea}={color}{color:#eeffff} > {color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d} > column_name {color}{color:#f78c6c}FROM{color}{color:#c3e88d} > INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d} > table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"' > {color}{color:#f78c6c}AND{color}{color:#c3e88d} > data_type{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}enum'{color}{color:#89ddff}"{color} > {color:#eeffff} enum_value_sql > {color}{color:#c792ea}={color}{color:#eeffff} > {color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d} > {color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}(column_type,{color}{color:#89ddff}'{color}{color:#c3e88d}enum',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d})',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}(',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}\'',{color}{color:#89ddff}'{color}{color:#c3e88d}') > enums {color}{color:#f78c6c}FROM{color}{color:#c3e88d} > INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d} > table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"' > {color}{color:#f78c6c}AND{color}{color:#c3e88d} > column_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+column_name+"'{color}{color:#89ddff}"{color} > {color:#eeffff} enum_value > {color}{color:#c792ea}={color}{color:#eeffff} > enum_value_sql.split{color}{color:#89ddff}({color}{color:#89ddff}'{color}{color:#c3e88d},{color}{color:#89ddff}'{color}{color:#89ddff}){color} > {color:#eeffff} {color}{color:#89ddff}for{color}{color:#eeffff} > col {color}{color:#c792ea}in{color}{color:#eeffff} > input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}columns{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}:{color} > {color:#eeffff} {color}{color:#89ddff}if{color}{color:#eeffff} > col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}name{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff} > {color}{color:#c792ea}=={color}{color:#eeffff} > {color}{color:#b2ccd6}str{color}{color:#89ddff}({color}{color:#eeffff}column_name{color}{color:#89ddff}){color}{color:#eeffff}:{color} > {color:#eeffff} > col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff} > {color}{color:#c792ea}={color}{color:#eeffff} > enum_value{color}{color:#89ddff}[{color}{color:#b2ccd6}int{color}{color:#89ddff}({color}{color:#eeffff}col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]){color}{color:#eeffff} > {color}{color:#c792ea}-{color}{color:#eeffff} > {color}{color:#f78c6c}1{color}{color:#89ddff}]{color} > {color:#eeffff} output_text > {color}{color:#c792ea}={color}{color:#eeffff} > json.dumps{color}{color:#89ddff}({color}{color:#eeffff}input_obj{color}{color:#89ddff}){color} > {color:#eeffff} > outputStream.write{color}{color:#89ddff}({color}{color:#eeffff}StringUtil.toBytes{color}{color:#89ddff}({color}{color:#eeffff}output_text{color}{color:#89ddff})){color} > {color:#eeffff} > {color}{color:#89ddff}except{color}{color:#eeffff}:{color} > {color:#eeffff} > traceback.print_exc{color}{color:#89ddff}({color}{color:#ff5370}file{color}{color:#c792ea}={color}{color:#eeffff}sys.stdout{color}{color:#89ddff}){color} > {color:#eeffff} {color}{color:#89ddff}raise{color} > {color:#eeffff}flowFile {color}{color:#c792ea}={color}{color:#eeffff} > session.get{color}{color:#89ddff}(){color} > {color:#89ddff}if{color}{color:#eeffff} flowFile > {color}{color:#c792ea}!={color}{color:#eeffff} > {color}{color:#f78c6c}None{color}{color:#eeffff}:{color} > {color:#eeffff} flowFile {color}{color:#c792ea}={color}{color:#eeffff} > session.write{color}{color:#89ddff}({color}{color:#eeffff}flowFile, > TransformCallback{color}{color:#89ddff}()){color} > {color:#89ddff} {color}{color:#546e7a}# Finish by transferring the > FlowFile to an output relationship{color} > {color:#eeffff}session.transfer{color}{color:#89ddff}({color}{color:#eeffff}flowFile, > REL_SUCCESS{color}{color:#89ddff}){color} -- This message was sent by Atlassian Jira (v8.3.4#803005)