Hi Ryan, If I understand well, you have a jolt map specification for each user. In that case you can put the relation guid, map_spec in some table of a external database. After that in your flow before JoltTransformJSON you create a custom processor to read the configuration from the table and transform the map_sec in an attribute you can put in jolt specification property of JoltTransformJSON processor, using expression language.
I created a simple general processor for read from a relational database and transform the fields in attributes , I can share with you: [cid:image001.png@01D45C17.2272F770] In pdaPool property you put the name of you DBCPConnectionPool. The code of custom processor named executeSqlOnFlowV1.grovy: import org.apache.nifi.processor.io.StreamCallback import java.nio.charset.StandardCharsets import org.apache.nifi.controller.ControllerService import java.sql.DriverManager import groovy.sql.Sql def getConnFromPool(dbServiceName) { def lookup = context.controllerServiceLookup def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == dbServiceName } def conn = lookup.getControllerService(dbcpServiceId)?.getConnection() return conn } def writeFlowFile(flowFile, str) { flowFile = session.write(flowFile, { outputStream -> str.eachLine { line -> outputStream.write(line.bytes) outputStream.write('\n'.bytes) } } as OutputStreamCallback) return flowFile } //Main Program def pda,stmt,rows, cols=[] def flowFile = session.get() if (!flowFile) return try { //create a DB Connection from Pool pda=new Sql( getConnFromPool(pdaPool.value)) stmt=query.evaluateAttributeExpressions(flowFile).value.toString() log.info("executeQuery:${stmt}") //create new attributes which names are the columns names and the values are the values of the columns pda.firstRow(stmt, {meta-> (1..meta.columnCount).each {cols << meta.getColumnName(it)} }) { cols.each {col-> flowFile=session.putAttribute(flowFile,col,"${it.getAt(col)}") } } session.transfer(flowFile, REL_SUCCESS) } catch(Exception e) { log.error('executeQuery: error:', e) flowFile=writeFlowFile(flowFile,"Error executing query\n${e}\n${e.getStackTrace()}") session.transfer(flowFile, REL_FAILURE) } finally { pda?.close() } I hope this help. Carlos From: Ryan H [mailto:ryan.howell.developm...@gmail.com] Sent: quinta-feira, 4 de outubro de 2018 18:22 To: users@nifi.apache.org Subject: Dynamic Mapping Hi All, I have been working on an integration between two systems with NiFi in the middle as the integration point. Basically when an event happens on System-A, such as a record update, those changes need to be propagated to System-B. For the first use case, I have set up a data flow that listens for incoming requests from System-A, performs a mapping, the sends the mapped data to System-B. Generalized Flow for "Create_Event" (dumbed down significantly): System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON -> InvokeHTTP -> System-B "Create_Event" This works great for the first case with a predefined mapping in JoltTransformJSON. Now I want to generalize it so that the same data flow can be used for all Create_Event's on System-A. Here is where the issue comes in. There is a base schema for System-A that has a base mapping to the base schema in System-B. Users of the System have the ability to "extend" the base schema to add/remove fields and modify the base mapping. So each time the Create_Event happens, the mapping that is used should be the unique mapping spec associated to that user (call it a GUID that comes along with the request). The data flow is the exact same for all Create_Events, except for the mapping, which will be unique to the user. Does anyone know of a way to load up a different mapping to be used on a per-request basis? I used JoltTransformJSON just as a proof of concept, so it does not need to be a Jolt spec and can be modified to meet the needs of whatever would work for this. I started to look into schema registry, but kind of got lost a bit and wasn't sure if it could be applied to this situation. Any Help is, as always, greatly appreciated. Cheers, Ryan H.