[ 
https://issues.apache.org/jira/browse/NIFI-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198973#comment-17198973
 ] 

zeyk commented on NIFI-7820:
----------------------------

Hi [~RobertoGarcia] ,

I have tried the above method with my own logic in it using 
ExecuteStreamCommand processor. The script throws up error stating "No module 
named mysql" error, but the module has been installed in local computed where 
nifi is installed, moreover the same script runs good from the terminal with 
hard coding the flowfile values. Please find the screenshots below

!image-2020-09-20-17-36-02-743.png!

 

!image-2020-09-20-17-37-49-409.png!

 

!image-2020-09-20-17-37-12-487.png!

 

Updated script:

 

#!/usr/bin/env python3

import re
import sys
import string
import mysql.connector
from mysql.connector import errorcode

config = {
 'user':'data',
 'password':'pass',
 'host':'host',
 'database':'ml'
}

try:
   cnx = mysql.connector.connect(**config)
except mysql.connector.Erro as err:
   if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
      print("Something is wrong with your user name or password")
   elif err.errno == errorcode.ER_BAD_DB_ERROR:
      print("Database does not exist")
   else:
      print(err)

cursor = cnx.cursor()
flowfile=sys.stdin.readline()
table_name = flowfile['table_name']
col_sql = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE 
table_name='"+table_name+"' AND data_type='enum'"
cursor.execute(col_sql)
column_name = str(cursor.fetchone()[0])
col_query = (
 "SELECT 
REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\\'','')
 enums "
 "FROM INFORMATION_SCHEMA.COLUMNS "
 "WHERE table_name='"+table_name+"' AND column_name='"+column_name+"'")

cursor.execute(col_query)

enum_row = str(cursor.fetchone()[0])
cursor.close()
cnx.close()
enum_val = enum_row.split(',')

for col in flowfile['columns']:
    if col['name'] == str(column_name):
      col['value'] = enum_val[int(col['value']) - 1]


print(flowfile)

> How to connect to controller service DBCP connection pool and execute the sql 
> using that connection via python 
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-7820
>                 URL: https://issues.apache.org/jira/browse/NIFI-7820
>             Project: Apache NiFi
>          Issue Type: Task
>          Components: NiFi Stateless
>            Reporter: zeyk
>            Priority: Major
>         Attachments: enumsWithPython.xml, execute_script.py, 
> image-2020-09-20-01-44-32-117.png, image-2020-09-20-01-45-19-739.png, 
> image-2020-09-20-01-58-03-615.png, image-2020-09-20-17-36-02-743.png, 
> image-2020-09-20-17-37-12-487.png, image-2020-09-20-17-37-49-409.png
>
>
> I have a python code updated with my own logic to replace the index of enums 
> with values, but in order to do so , i need to execute certain sql commands 
> to get some values and the best thing would be to get the dbcp connection 
> pool connection from nifi controller services and execute the commands , but 
> i dont know to how to implement that in python, since few modules such as 
> pymysql doesnt support in Nifi(Jython) ,Please find the code below:
> Any kind of help would be highly appreciated. Thanks in advance
>  
> {color:#89ddff}import{color}{color:#eeffff} json{color}
> {color:#89ddff}import{color}{color:#eeffff} re{color}
> {color:#89ddff}import{color}{color:#eeffff} sys{color}
> {color:#89ddff}import{color}{color:#eeffff} traceback{color}
> {color:#89ddff}from{color}{color:#eeffff} java.nio.charset 
> {color}{color:#89ddff}import{color}{color:#eeffff} StandardCharsets{color}
> {color:#89ddff}from{color}{color:#eeffff} org.apache.commons.io 
> {color}{color:#89ddff}import{color}{color:#eeffff} IOUtils{color}
> {color:#89ddff}from{color}{color:#eeffff} org.apache.nifi.processor.io 
> {color}{color:#89ddff}import{color}{color:#eeffff} StreamCallback{color}
> {color:#89ddff}from{color}{color:#eeffff} org.python.core.util 
> {color}{color:#89ddff}import{color}{color:#eeffff} StringUtil{color}
> {color:#c792ea}class{color}{color:#eeffff} 
> {color}{color:#ffcb6b}TransformCallback{color}{color:#89ddff}({color}{color:#c3e88d}StreamCallback{color}{color:#89ddff}):{color}
> {color:#eeffff}    {color}{color:#c792ea}def{color}{color:#eeffff} 
> {color}{color:#82aaff}__init__{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff}):{color}
> {color:#eeffff}        {color}{color:#89ddff}pass{color}
> {color:#eeffff}    {color}{color:#c792ea}def{color}{color:#eeffff} 
> {color}{color:#82aaff}process{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff},{color}{color:#eeffff}
>  
> {color}{color:#ff5370}inputStream{color}{color:#89ddff},{color}{color:#eeffff}
>  {color}{color:#ff5370}outputStream{color}{color:#89ddff}):{color}
> {color:#eeffff}        {color}{color:#89ddff}try{color}{color:#eeffff}:{color}
> {color:#89ddff}            {color}{color:#546e7a}# Read input FlowFile 
> content{color}
> {color:#eeffff}            input_text 
> {color}{color:#c792ea}={color}{color:#eeffff} 
> IOUtils.toString{color}{color:#89ddff}({color}{color:#eeffff}inputStream, 
> StandardCharsets.UTF_8{color}{color:#89ddff}){color}
> {color:#eeffff}            input_obj 
> {color}{color:#c792ea}={color}{color:#eeffff} 
> json.loads{color}{color:#89ddff}({color}{color:#eeffff}input_text{color}{color:#89ddff}){color}
> {color:#eeffff} {color}
> {color:#eeffff}            {color}
> {color:#eeffff}            {color}{color:#eeffff}table_name 
> {color}{color:#c792ea}={color}{color:#eeffff} 
> input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}table_name{color}{color:#89ddff}'{color}{color:#89ddff}]{color}
> {color:#eeffff}            column_name 
> {color}{color:#c792ea}={color}{color:#eeffff}  
> {color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d} 
> column_name {color}{color:#f78c6c}FROM{color}{color:#c3e88d} 
> INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d} 
> table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"'
>  {color}{color:#f78c6c}AND{color}{color:#c3e88d} 
> data_type{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}enum'{color}{color:#89ddff}"{color}
> {color:#eeffff}            enum_value_sql 
> {color}{color:#c792ea}={color}{color:#eeffff} 
> {color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d} 
> {color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}(column_type,{color}{color:#89ddff}'{color}{color:#c3e88d}enum',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d})',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}(',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}\'',{color}{color:#89ddff}'{color}{color:#c3e88d}')
>  enums {color}{color:#f78c6c}FROM{color}{color:#c3e88d} 
> INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d} 
> table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"'
>  {color}{color:#f78c6c}AND{color}{color:#c3e88d} 
> column_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+column_name+"'{color}{color:#89ddff}"{color}
> {color:#eeffff}            enum_value 
> {color}{color:#c792ea}={color}{color:#eeffff} 
> enum_value_sql.split{color}{color:#89ddff}({color}{color:#89ddff}'{color}{color:#c3e88d},{color}{color:#89ddff}'{color}{color:#89ddff}){color}
> {color:#eeffff}            {color}{color:#89ddff}for{color}{color:#eeffff} 
> col {color}{color:#c792ea}in{color}{color:#eeffff} 
> input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}columns{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}:{color}
> {color:#eeffff}                {color}{color:#89ddff}if{color}{color:#eeffff} 
> col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}name{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}
>  {color}{color:#c792ea}=={color}{color:#eeffff} 
> {color}{color:#b2ccd6}str{color}{color:#89ddff}({color}{color:#eeffff}column_name{color}{color:#89ddff}){color}{color:#eeffff}:{color}
> {color:#eeffff}                    
> col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}
>  {color}{color:#c792ea}={color}{color:#eeffff} 
> enum_value{color}{color:#89ddff}[{color}{color:#b2ccd6}int{color}{color:#89ddff}({color}{color:#eeffff}col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]){color}{color:#eeffff}
>  {color}{color:#c792ea}-{color}{color:#eeffff} 
> {color}{color:#f78c6c}1{color}{color:#89ddff}]{color}
> {color:#eeffff}            output_text 
> {color}{color:#c792ea}={color}{color:#eeffff} 
> json.dumps{color}{color:#89ddff}({color}{color:#eeffff}input_obj{color}{color:#89ddff}){color}
> {color:#eeffff}            
> outputStream.write{color}{color:#89ddff}({color}{color:#eeffff}StringUtil.toBytes{color}{color:#89ddff}({color}{color:#eeffff}output_text{color}{color:#89ddff})){color}
> {color:#eeffff}        
> {color}{color:#89ddff}except{color}{color:#eeffff}:{color}
> {color:#eeffff}            
> traceback.print_exc{color}{color:#89ddff}({color}{color:#ff5370}file{color}{color:#c792ea}={color}{color:#eeffff}sys.stdout{color}{color:#89ddff}){color}
> {color:#eeffff}            {color}{color:#89ddff}raise{color}
> {color:#eeffff}flowFile {color}{color:#c792ea}={color}{color:#eeffff} 
> session.get{color}{color:#89ddff}(){color}
> {color:#89ddff}if{color}{color:#eeffff} flowFile 
> {color}{color:#c792ea}!={color}{color:#eeffff} 
> {color}{color:#f78c6c}None{color}{color:#eeffff}:{color}
> {color:#eeffff}    flowFile {color}{color:#c792ea}={color}{color:#eeffff} 
> session.write{color}{color:#89ddff}({color}{color:#eeffff}flowFile, 
> TransformCallback{color}{color:#89ddff}()){color}
> {color:#89ddff}    {color}{color:#546e7a}# Finish by transferring the 
> FlowFile to an output relationship{color}
> {color:#eeffff}session.transfer{color}{color:#89ddff}({color}{color:#eeffff}flowFile,
>  REL_SUCCESS{color}{color:#89ddff}){color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to