Hi Ryan,

If I understand well, you have a jolt map specification for each user. In that 
case you can put  the relation guid, map_spec  in some table of a external 
database.
After that in your flow  before JoltTransformJSON you create a custom processor 
to read the configuration from the table and transform the map_sec in an 
attribute  you can put in jolt specification property of JoltTransformJSON   
processor,  using expression language.

I created a simple general processor for read from a relational database and 
transform the fields in attributes , I can  share with  you:


[cid:image001.png@01D45C17.2272F770]

In  pdaPool property you put the name of you DBCPConnectionPool.

The code of custom processor named executeSqlOnFlowV1.grovy:

import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import java.sql.DriverManager
import groovy.sql.Sql


def getConnFromPool(dbServiceName) {
       def lookup = context.controllerServiceLookup
       def dbcpServiceId = 
lookup.getControllerServiceIdentifiers(ControllerService).find { cs ->
             lookup.getControllerServiceName(cs) == dbServiceName
       }
       def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
       return conn
}


def writeFlowFile(flowFile, str) {
       flowFile = session.write(flowFile, {
             outputStream ->
             str.eachLine { line ->
                    outputStream.write(line.bytes)
                    outputStream.write('\n'.bytes)
             }
       } as OutputStreamCallback)

       return flowFile
}

//Main Program
def  pda,stmt,rows, cols=[]
def flowFile = session.get()
if (!flowFile) return

try {
       //create a DB Connection from Pool
       pda=new Sql( getConnFromPool(pdaPool.value))
       stmt=query.evaluateAttributeExpressions(flowFile).value.toString()

       log.info("executeQuery:${stmt}")

       //create new attributes which names are the columns names and the values 
are the values of the columns
       pda.firstRow(stmt, {meta-> (1..meta.columnCount).each {cols << 
meta.getColumnName(it)} })
       {
             cols.each {col->
               flowFile=session.putAttribute(flowFile,col,"${it.getAt(col)}")
             }
       }

       session.transfer(flowFile, REL_SUCCESS)

}
catch(Exception e) {
       log.error('executeQuery: error:', e)
       flowFile=writeFlowFile(flowFile,"Error executing 
query\n${e}\n${e.getStackTrace()}")
       session.transfer(flowFile, REL_FAILURE)
}
finally {
       pda?.close()
}


I hope this help.

Carlos


From: Ryan H [mailto:ryan.howell.developm...@gmail.com]
Sent: quinta-feira, 4 de outubro de 2018 18:22
To: users@nifi.apache.org
Subject: Dynamic Mapping

Hi All,

I have been working on an integration between two systems with NiFi in the 
middle as the integration point. Basically when an event happens on System-A, 
such as a record update, those changes need to be propagated to System-B. For 
the first use case, I have set up a data flow that listens for incoming 
requests from System-A, performs a mapping, the sends the mapped data to 
System-B.

Generalized Flow for "Create_Event" (dumbed down significantly):
System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON -> InvokeHTTP 
-> System-B "Create_Event"

This works great for the first case with a predefined mapping in 
JoltTransformJSON. Now I want to generalize it so that the same data flow can 
be used for all Create_Event's on System-A.

Here is where the issue comes in. There is a base schema for System-A that has 
a base mapping to the base schema in System-B. Users of the System have the 
ability to "extend" the base schema to add/remove fields and modify the base 
mapping. So each time the Create_Event happens, the mapping that is used should 
be the unique mapping spec associated to that user (call it a GUID that comes 
along with the request).

The data flow is the exact same for all Create_Events, except for the mapping, 
which will be unique to the user.

Does anyone know of a way to load up a different mapping to be used on a 
per-request basis? I used JoltTransformJSON just as a proof of concept, so it 
does not need to be a Jolt spec and can be modified to meet the needs of 
whatever would work for this.

I started to look into schema registry, but kind of got lost a bit and wasn't 
sure if it could be applied to this situation.

Any Help is, as always, greatly appreciated.


Cheers,

Ryan H.

Reply via email to