Fwd: [CaptureChangeMysql] Cannot get real integer value when integer column set unsigned

2018-08-02 Thread Ashwin Konale
Hello guys,

We are trying out nifi in one of our usecases to get mysql changes and dump
to hadoop in Booking.com . We are using CaptureChangeMysql processor to
read binlogs and process it in later stages. So far everything works fine,
but when the column is of unsigned primitive type, its not able to read the
data properly when the value overflows. I investigated a bit and came
across this (
https://github.com/shyiko/mysql-binlog-connector-java/issues/104). The
library which is internally being used in CaptureChangeMysql Processor (
https://github.com/shyiko/mysql-binlog-connector-java) does not handle this
case and its upto consumer to handle it. More description can be found in
the link.

Before writing any kind of patch, I wanted to check Since this is quite
common case, has anyone come across this? It would be really great if
someone can suggest a solution on this.

Nifi version  : 1.6.0
Mysql version : 5.7.22-log
Binlog type.  : Row-based logging

Thanks a lot

Ashwin Konale
Developer

Booking.com B.V.
Herengracht 597 Amsterdam 1017 CE Netherlands
Direct +31207242562
[image: Booking.com] <https://www.booking.com/>
The world's #1 accommodation site
43 languages, 198+ offices worldwide, 120,000+ global destinations,
1,550,000+ room nights booked every day
No booking fees, best price always guaranteed
Subsidiary of Booking Holdings Inc. (NASDAQ: BKNG)


[CaptureChangeMysql] Cannot get real value of primitive type when column type is unsigned.

2018-08-08 Thread ashwin konale
Hello all,

I recently started using nifi to read binlogs and ingest to various
sources. I noticed that when the column type is unsigned, the underlaying
library sends signed version of the value. (Eg, column type is unsigned
tinyint, when the inserted value is 128, it sends -128). Its related to
https://github.com/shyiko/mysql-binlog-connector-java/issues/104 . Has
anyone come across this ? Is there any workaround to this other than
patching CaptureChangeMysql processor ?
Any help would be much appreciated.

Ashwin


How to optimise use of MergeContent for large number of bins.

2018-09-21 Thread ashwin konale
I have a nifi workflow to read from multiple mysql binlogs and put it to
hdfs. I am using ChangeDataCapture as a source and PutHdfs as sink. I am
using MergeContent processor in between to chunk the messages together for
hdfs.

*[CDC (Primary node only. About 200 of this processor for each db)]
*-> *UpdateAttributes(db-table-hour)
-> MergeContent 500 msg(bin based on db-table-hour) ->MergeContent 200
msg(bin based on db-table-hour) -> Put hdfs*

I have about ~200 databases to read from, And ~2500 tables altogether.
Update rate for binlogs is around 1Mbps per database. I am planning to run
this on 3 node nifi cluster as of now.

Has anyone used mergecontent with more than 2000 bins before? Does it scale
well.? Can someone suggest me any improvements to the workflow or
alternatives.

Thanks
Ashwin


Unable to operate nifi ui due to "java.net.SocketTimeoutException: Read timed out"

2018-09-25 Thread ashwin konale
Hi,

I am running 3 node nifi cluster (embedded zookeeper) with following flow.
It pulls the data from mysql binlogs using CaptureChangeMysql and sending
it to HDFS.

CDC[Primary node only] -> outputPort -> RPG

inputPort -> MergeContent -> PutHdfs

I am using RPG to distribute load among other nodes in cluster. CDC reads
large amount of data (~10 Mbps). But as soon as I start CDC processor, nifi
ui hangs, and I get "javax.ws.rs.ProcessingException:
java.net.SocketTimeoutException: Read timed out" , I have gone through
various threads and tweaked certain properties like,

nifi.cluster.node.connection.timeout = 30 seconds
nifi.cluster.node.read.timeout = 30 sec
nifi.cluster.node.protocol.threads = 20
nifi.web.jetty.threads = 400

Nothing unusual in the logs except data flow seems to be continuing. I am
not able to figure out what is going wrong. Can someone please help me with
it. Let me know what more information I need to provide in order to debug
this.

Stacktrace:

2018-09-25 12:00:49,888 ERROR [LearnerHandler-/10.10.10.10:31955]
o.a.z.server.quorum.LearnerHandler Unexpected exception causing shutdown
while sock still open

java.net.SocketTimeoutException: Read timed out

at java.net.SocketInputStream.socketRead0(Native Method)

at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)

at java.net.SocketInputStream.read(SocketInputStream.java:171)

at java.net.SocketInputStream.read(SocketInputStream.java:141)

at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)

at java.io.BufferedInputStream.read(BufferedInputStream.java:265)

at java.io.DataInputStream.readInt(DataInputStream.java:387)

at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63)

at
org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:83)

at
org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103)

at
org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:546)

2018-09-25 12:00:51,901 INFO [Process Cluster Protocol Request-7]
o.a.n.c.p.impl.SocketProtocolListener Finished processing request
e181bdbd-00c7-4f3b-b69a-34ff7c1c9cee (type=HEARTBEAT, length=3565 bytes)
from nifiapp-1002:8080 in 1 millis


Any help would be much appreciated.
Ashwin


Slow flowfile transfer from process group port to output port.

2018-09-25 Thread ashwin konale
Hi I have the following flow which I am trying to increase the throughput.
I am runnign 3 node nifi cluster(v1.6)

CDC -> processGroupOutputPort -> externalOutputPort
RPG -> downstream

Flowfiles are always queued up between processGroupOutputPort to
externalOutputPort. I dont see any configuration to increase the number of
threads for this operation as well. I have set backpressure to be 10,000
flowfiles. Since the CDC produces at much faster rate than this, I want to
increase the throughput of the flow. Also I see in the UI
externalOutputPort has number of threads running as 3 all the time no
matter how much I increase number of threads to consume in RPG.

Any pointers here will be much helpful.

Ashwin


Re: Slow flowfile transfer from process group port to output port.

2018-09-26 Thread Ashwin Konale
Hello Mike,
Thanks a lot for looking at it. But I have already configured threads for 
remote port in RPG. But congestion is in the transfer between process group to 
external port. And there are no configuration option to increase number of 
threads in externalOutputPort. Number of running threads for output port is 
always showing as 3 in ui. So I am not sure how to increase the throughput now. 
I quickly went through implementation of LocalPort.java. Looks like flowflies 
are transferred at batches of 100. Can you give me any other suggestion to 
increase throughput ? 

-Ashwin

On 2018/09/25 15:29:32, ashwin konale  wrote: 
> Hi I have the following flow which I am trying to increase the throughput.
> I am runnign 3 node nifi cluster(v1.6)
> 
> CDC -> processGroupOutputPort -> externalOutputPort
> RPG -> downstream
> 
> Flowfiles are always queued up between processGroupOutputPort to
> externalOutputPort. I dont see any configuration to increase the number of
> threads for this operation as well. I have set backpressure to be 10,000
> flowfiles. Since the CDC produces at much faster rate than this, I want to
> increase the throughput of the flow. Also I see in the UI
> externalOutputPort has number of threads running as 3 all the time no
> matter how much I increase number of threads to consume in RPG.
> 
> Any pointers here will be much helpful.
> 
> Ashwin
> 

Re: Slow flowfile transfer from process group port to output port.

2018-09-26 Thread ashwin . konale



On 2018/09/25 15:34:03, Mark Payne  wrote: 
> Ashwin,
> 
> You'll want to Right-Click on the RPG and then choose to configure Remote 
> Ports. From there, you can
> configure how many threads should be used to pull data from each port. So 
> you've updated the Output Port
> to use up to 3 threads per node to provide the data, but each node is still 
> only using 1 thread to pull the data.
> By configuring the Remote Ports, you can configure the RPG to use 3 threads 
> to pull the data as well.
> 
> Thanks
> -Mark
> 
> 
> > On Sep 25, 2018, at 11:29 AM, ashwin konale  wrote:
> > 
> > Hi I have the following flow which I am trying to increase the throughput.
> > I am runnign 3 node nifi cluster(v1.6)
> > 
> > CDC -> processGroupOutputPort -> externalOutputPort
> > RPG -> downstream
> > 
> > Flowfiles are always queued up between processGroupOutputPort to
> > externalOutputPort. I dont see any configuration to increase the number of
> > threads for this operation as well. I have set backpressure to be 10,000
> > flowfiles. Since the CDC produces at much faster rate than this, I want to
> > increase the throughput of the flow. Also I see in the UI
> > externalOutputPort has number of threads running as 3 all the time no
> > matter how much I increase number of threads to consume in RPG.
> > 
> > Any pointers here will be much helpful.
> > 
> > Ashwin
> 
> Hi Mark
Thanks a lot for the reply. I have already configured RPG to use more threads 
on remote ports. But since nifi does not provide any option to increase number 
of threads on externalOutputPort, it by default runs on 3 threads. (I am not 
sure how it chose 3 threads.) So flowfiles are always blocked between 
processGroupOutputPort to externalOutputPort. I quickly went through sourcecode 
of LocalPort.java. Looks like it transfers in batches of 100. Can you provide 
some guidance here on how to improve the throughput ? 
Thanks
- Ashwin


Re: Unable to operate nifi ui due to "java.net.SocketTimeoutException: Read timed out"

2018-09-26 Thread ashwin . konale



On 2018/09/25 13:13:12, Mark Payne  wrote: 
> Hi Ashwin,
> 
> The embedded ZooKeeper is provided as a convenience so that you can easily 
> test running things in a cluster
> on a laptop, etc. However, it struggles when your nodes are handling any kind 
> of significant data rate. It is always
> recommended that an external ZooKeeper be used for any sort of production use.
> 
> Thanks
> -Mark
> 
> > On Sep 25, 2018, at 6:03 AM, ashwin konale  wrote:
> > 
> > Hi,
> > 
> > I am running 3 node nifi cluster (embedded zookeeper) with following flow.
> > It pulls the data from mysql binlogs using CaptureChangeMysql and sending
> > it to HDFS.
> > 
> > CDC[Primary node only] -> outputPort -> RPG
> > 
> > inputPort -> MergeContent -> PutHdfs
> > 
> > I am using RPG to distribute load among other nodes in cluster. CDC reads
> > large amount of data (~10 Mbps). But as soon as I start CDC processor, nifi
> > ui hangs, and I get "javax.ws.rs.ProcessingException:
> > java.net.SocketTimeoutException: Read timed out" , I have gone through
> > various threads and tweaked certain properties like,
> > 
> > nifi.cluster.node.connection.timeout = 30 seconds
> > nifi.cluster.node.read.timeout = 30 sec
> > nifi.cluster.node.protocol.threads = 20
> > nifi.web.jetty.threads = 400
> > 
> > Nothing unusual in the logs except data flow seems to be continuing. I am
> > not able to figure out what is going wrong. Can someone please help me with
> > it. Let me know what more information I need to provide in order to debug
> > this.
> > 
> > Stacktrace:
> > 
> > 2018-09-25 12:00:49,888 ERROR [LearnerHandler-/10.10.10.10:31955]
> > o.a.z.server.quorum.LearnerHandler Unexpected exception causing shutdown
> > while sock still open
> > 
> > java.net.SocketTimeoutException: Read timed out
> > 
> > at java.net.SocketInputStream.socketRead0(Native Method)
> > 
> > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> > 
> > at java.net.SocketInputStream.read(SocketInputStream.java:171)
> > 
> > at java.net.SocketInputStream.read(SocketInputStream.java:141)
> > 
> > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> > 
> > at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
> > 
> > at java.io.DataInputStream.readInt(DataInputStream.java:387)
> > 
> > at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63)
> > 
> > at
> > org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:83)
> > 
> > at
> > org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103)
> > 
> > at
> > org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:546)
> > 
> > 2018-09-25 12:00:51,901 INFO [Process Cluster Protocol Request-7]
> > o.a.n.c.p.impl.SocketProtocolListener Finished processing request
> > e181bdbd-00c7-4f3b-b69a-34ff7c1c9cee (type=HEARTBEAT, length=3565 bytes)
> > from nifiapp-1002:8080 in 1 millis
> > 
> > 
> > Any help would be much appreciated.
> > Ashwin
> 
> Thanks Mark. Will try that


Re: Slow flowfile transfer from process group port to output port.

2018-09-26 Thread ashwin . konale



On 2018/09/25 15:34:03, Mark Payne  wrote: 
> Ashwin,
> 
> You'll want to Right-Click on the RPG and then choose to configure Remote 
> Ports. From there, you can
> configure how many threads should be used to pull data from each port. So 
> you've updated the Output Port
> to use up to 3 threads per node to provide the data, but each node is still 
> only using 1 thread to pull the data.
> By configuring the Remote Ports, you can configure the RPG to use 3 threads 
> to pull the data as well.
> 
> Thanks
> -Mark
> 
> 
> > On Sep 25, 2018, at 11:29 AM, ashwin konale  wrote:
> > 
> > Hi I have the following flow which I am trying to increase the throughput.
> > I am runnign 3 node nifi cluster(v1.6)
> > 
> > CDC -> processGroupOutputPort -> externalOutputPort
> > RPG -> downstream
> > 
> > Flowfiles are always queued up between processGroupOutputPort to
> > externalOutputPort. I dont see any configuration to increase the number of
> > threads for this operation as well. I have set backpressure to be 10,000
> > flowfiles. Since the CDC produces at much faster rate than this, I want to
> > increase the throughput of the flow. Also I see in the UI
> > externalOutputPort has number of threads running as 3 all the time no
> > matter how much I increase number of threads to consume in RPG.
> > 
> > Any pointers here will be much helpful.
> > 
> > Ashwin
> 
> Hello Mark, Thanks a lot for the reply.
I have already configured number of threads to in remote ports in RPG. But no 
matter how many threads I set it to, the number of threads running in output 
port is showing as 3. And there is no setting to update the number of threads 
for the output port. I quickly went through LocalPort.java . 
(https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java)
 Looks like it transfers in the batches of 100. Is there any way to increase 
the throughput.? 


Fixing unstable nifi cluster.

2018-10-16 Thread ashwin konale
Hi,
We have a 3 node nifi cluster (With separate zookeper instances running in
the same machines) which pulls the data from mysql and write to hdfs. I am
frequently running into problems with cluster. Nodes keeps disconnecting
from each other, primary nodes keeps switching and sometimes it just goes
into zombie state when I just cannot access the ui. I have followed best
practices guide and tweaked params in nifi.properties, have switched
provenanceRepositoryImplementation to volatile because cluster was not able
to keep up with incoming traffic. Data traffic is not high at all (4Mbps).
This is the message I frequently get from the logs.

*INFO [main-EventThread] o.a.c.f.state.ConnectionStateManager State change:
LOST*
*INFO [Curator-ConnectionStateManager-0]
o.a.n.c.l.e.CuratorLeaderElectionManager
org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager$ElectionListener@56ebedec
Connection State changed to LOST*
*INFO [Curator-ConnectionStateManager-0]
o.a.n.c.l.e.CuratorLeaderElectionManager
org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager$ElectionListener@1b0e2055
Connection State changed to LOST*
*INFO [main-EventThread] o.a.c.f.state.ConnectionStateManager State change:
RECONNECTED*

Am I doing something wrong with cluster setup ? Can someone give me some
guidance on how to go about debugging this issue ? What kind of system
metrics to look at etc.

Ashwin


Scaling source processors in nifi horizontally.

2018-10-17 Thread ashwin konale
Hi,

I am experimenting with nifi for one of our usecases with plans of
extending it to various other data routing, ingestion usecases. Right now I
need to ingest data from mysql binlogs to hdfs/GCS. We have around 250
different schemas and about 3000 tables to read data from. Volume of the
data flow ranges from 500 - 2000 messages per second in different schemas.

Right now the problem is mysqlCDC processor can run in only one thread. To
overcome this issue I have two options.

1. Use primary node execution, so different processors for each of the
schemas. So eventually all processors which reads from mysql will run in
single node, which will be a bottleneck no matter how big my nifi cluster
is.

2. Another approach is to use multiple nifi instances to pull data and have
master nifi cluster for ingestion to various sinks. In this approach I will
have to manage all these small nifi instances, and may have to build some
kind of tooling on top of it to monitor/provision new processor for newly
added schemas etc.

Is there any better way to achieve my usecase with nifi ? Please advice me
on the architechture.

Looking forward for suggestion.

- Ashwin


Re: Scaling source processors in nifi horizontally.

2018-10-18 Thread Ashwin Konale
Hi,

The flow is like this,

MysqlCDC -> UpdateAttributes -> MergeContent -> (PutHDFS, PutGCS)

But we have around 250 schemas to pull data from, So with clustering setup,

MysqlCDC_schema1 -> RPG
MysqlCDC_schema2 -> RPG
MysqlCDC_schema3 -> RPG and so on

InputPort -> UpdateAttributes -> MergeContent -> (PutHDFS, PutGCS)

But MysqlCDC can run only in primary node in the cluster, I will end up running 
all of input processors in single node. This can easily become bottleneck with 
increasing number of schemas we have. Could you suggest me any alternative 
approach to this problem.


On 2018/10/17 21:14:09, Mike Thomsen  wrote: 
> > may have to build some kind of tooling on top of it to monitor/provision> 
> new processor for newly added schemas etc.> 
> 
> Could you elaborate on this part of your use case?> 
> 
> On Wed, Oct 17, 2018 at 2:31 PM ashwin konale > 
> wrote:> 
> 
> > Hi,> 
> >> 
> > I am experimenting with nifi for one of our usecases with plans of> 
> > extending it to various other data routing, ingestion usecases. Right now 
> > I> 
> > need to ingest data from mysql binlogs to hdfs/GCS. We have around 250> 
> > different schemas and about 3000 tables to read data from. Volume of the> 
> > data flow ranges from 500 - 2000 messages per second in different schemas.> 
> >> 
> > Right now the problem is mysqlCDC processor can run in only one thread. To> 
> > overcome this issue I have two options.> 
> >> 
> > 1. Use primary node execution, so different processors for each of the> 
> > schemas. So eventually all processors which reads from mysql will run in> 
> > single node, which will be a bottleneck no matter how big my nifi cluster> 
> > is.> 
> >> 
> > 2. Another approach is to use multiple nifi instances to pull data and 
> > have> 
> > master nifi cluster for ingestion to various sinks. In this approach I 
> > will> 
> > have to manage all these small nifi instances, and may have to build some> 
> > kind of tooling on top of it to monitor/provision new processor for newly> 
> > added schemas etc.> 
> >> 
> > Is there any better way to achieve my usecase with nifi ? Please advice me> 
> > on the architechture.> 
> >> 
> > Looking forward for suggestion.> 
> >> 
> > - Ashwin> 
> >> 
> 

Re: Scaling source processors in nifi horizontally.

2018-10-18 Thread ashwin konale
Hi,

The flow is like this,

MysqlCDC -> UpdateAttributes -> MergeContent -> (PutHDFS, PutGCS)

But we have around 250 schemas to pull data from, So with clustering setup,

MysqlCDC_schema1 -> RPG
MysqlCDC_schema2 -> RPG
MysqlCDC_schema3 -> RPG and so on

InputPort -> UpdateAttributes -> MergeContent -> (PutHDFS, PutGCS)

But MysqlCDC can run only in primary node in the cluster, I will end up
running all of input processors in single node. This can easily become
bottleneck with increasing number of schemas we have. Could you suggest me
any alternative approach to this problem.

On 2018/10/17 21:14:09, Mike Thomsen  wrote:
> > may have to build some kind of tooling on top of it to
monitor/provision>
> new processor for newly added schemas etc.>
>
> Could you elaborate on this part of your use case?>
>
> On Wed, Oct 17, 2018 at 2:31 PM ashwin konale >
> wrote:>
>
> > Hi,>
> >>
> > I am experimenting with nifi for one of our usecases with plans of>
> > extending it to various other data routing, ingestion usecases. Right
now I>
> > need to ingest data from mysql binlogs to hdfs/GCS. We have around 250>
> > different schemas and about 3000 tables to read data from. Volume of
the>
> > data flow ranges from 500 - 2000 messages per second in different
schemas.>
> >>
> > Right now the problem is mysqlCDC processor can run in only one thread.
To>
> > overcome this issue I have two options.>
> >>
> > 1. Use primary node execution, so different processors for each of the>
> > schemas. So eventually all processors which reads from mysql will run
in>
> > single node, which will be a bottleneck no matter how big my nifi
cluster>
> > is.>
> >>
> > 2. Another approach is to use multiple nifi instances to pull data and
have>
> > master nifi cluster for ingestion to various sinks. In this approach I
will>
> > have to manage all these small nifi instances, and may have to build
some>
> > kind of tooling on top of it to monitor/provision new processor for
newly>
> > added schemas etc.>
> >>
> > Is there any better way to achieve my usecase with nifi ? Please advice
me>
> > on the architechture.>
> >>
> > Looking forward for suggestion.>
> >>
> > - Ashwin>
> >>
>


Reading avro encoded message key from kafka

2018-11-23 Thread ashwin konale
Hi,
I have key-value pair of avro messages in kafka topic I want to consume
from. I can easily do modifications on message value using nifi
consumeKafkaRecord processor, but it doesnt show key of the message.
ConsumeKafka processor has kafka.key attribute but I am not sure how to
read its contents(Since it is avro encoded) and add certain values as
attributes to flowfile. Any pointers will be much helpful.

Thanks


Re: Reading avro encoded message key from kafka

2018-11-27 Thread Ashwin Konale
Hi,
Its not encoding issue. I am not able to figure out how to read Kafka key 
itself.
eg. 
Kafka key = {type: foo, meta: etc, etc }
Kafka message = {Avro Payload}

I want to use RouteOnAttribute processor based on type = foo or bar. For this 
to happen, I need to extract value foo from Kafka.key to flow file.  Basically 
I am not able to figure out how to read key and extract attributes from it from 
Kafka message in nifi. Could you suggest me something here.

Thanks


On 2018/11/23 15:14:53, Mike Thomsen  wrote: 
> If you are having encoding-related issues with reading that attribute, try> 
> switching to the Kafka string serializer in your producer.> 
> 
> On Fri, Nov 23, 2018 at 10:12 AM ashwin konale > 
> wrote:> 
> 
> > Hi,> 
> > I have key-value pair of avro messages in kafka topic I want to consume> 
> > from. I can easily do modifications on message value using nifi> 
> > consumeKafkaRecord processor, but it doesnt show key of the message.> 
> > ConsumeKafka processor has kafka.key attribute but I am not sure how to> 
> > read its contents(Since it is avro encoded) and add certain values as> 
> > attributes to flowfile. Any pointers will be much helpful.> 
> >> 
> > Thanks> 
> >> 
> 

smime.p7s
Description: S/MIME cryptographic signature


Re: Reading avro encoded message key from kafka

2018-11-27 Thread Ashwin Konale
Hi,
Thanks a lot for the suggestion. I didn’t know about jsonpath EL functions. I 
can easily implement that in my use case.

- Ashwin

On 2018/11/27 18:52:05, Bryan Bende  wrote: 
> Unfortunately I don't think there is a good way to interpret the value> 
> of the key when it is Avro because we don't have any expression> 
> language functions that understand Avro or record-oriented values.> 
> 
> The main option would be to change how the data is being produced in some 
> way...> 
> 
> - Put the value you are interested in in a message header, then it> 
> will come across as a string key/value pair in a flow file attribute> 
> and use RouteOnAttrbute> 
> - Put the value you are interested in in the message body somewhere,> 
> use PartitionRecord to route on the value of the field in the message> 
> - Use a different kind of key serialization like json which can then> 
> be parsed with expression language functions> 
> 
> A possible improvement we could make is to add some kind of> 
> "avro-to-json" EL function, then from there use the EL jsonPath> 
> function.> 
> 
> On Tue, Nov 27, 2018 at 1:01 PM Ashwin Konale> 
>  wrote:> 
> >> 
> > Hi,> 
> > Its not encoding issue. I am not able to figure out how to read Kafka key 
> > itself.> 
> > eg.> 
> > Kafka key = {type: foo, meta: etc, etc }> 
> > Kafka message = {Avro Payload}> 
> >> 
> > I want to use RouteOnAttribute processor based on type = foo or bar. For 
> > this to happen, I need to extract value foo from Kafka.key to flow file.  
> > Basically I am not able to figure out how to read key and extract 
> > attributes from it from Kafka message in nifi. Could you suggest me 
> > something here.> 
> >> 
> > Thanks> 
> >> 
> >> 
> > On 2018/11/23 15:14:53, Mike Thomsen  wrote:> 
> > > If you are having encoding-related issues with reading that attribute, 
> > > try>> 
> > > switching to the Kafka string serializer in your producer.>> 
> > >> 
> > > On Fri, Nov 23, 2018 at 10:12 AM ashwin konale >> 
> > > wrote:>> 
> > >> 
> > > > Hi,>> 
> > > > I have key-value pair of avro messages in kafka topic I want to 
> > > > consume>> 
> > > > from. I can easily do modifications on message value using nifi>> 
> > > > consumeKafkaRecord processor, but it doesnt show key of the message.>> 
> > > > ConsumeKafka processor has kafka.key attribute but I am not sure how 
> > > > to>> 
> > > > read its contents(Since it is avro encoded) and add certain values as>> 
> > > > attributes to flowfile. Any pointers will be much helpful.>> 
> > > >>> 
> > > > Thanks>> 
> > > >>> 
> > >> 
> 

smime.p7s
Description: S/MIME cryptographic signature