Fwd: [CaptureChangeMysql] Cannot get real integer value when integer column set unsigned
Hello guys, We are trying out nifi in one of our usecases to get mysql changes and dump to hadoop in Booking.com . We are using CaptureChangeMysql processor to read binlogs and process it in later stages. So far everything works fine, but when the column is of unsigned primitive type, its not able to read the data properly when the value overflows. I investigated a bit and came across this ( https://github.com/shyiko/mysql-binlog-connector-java/issues/104). The library which is internally being used in CaptureChangeMysql Processor ( https://github.com/shyiko/mysql-binlog-connector-java) does not handle this case and its upto consumer to handle it. More description can be found in the link. Before writing any kind of patch, I wanted to check Since this is quite common case, has anyone come across this? It would be really great if someone can suggest a solution on this. Nifi version : 1.6.0 Mysql version : 5.7.22-log Binlog type. : Row-based logging Thanks a lot Ashwin Konale Developer Booking.com B.V. Herengracht 597 Amsterdam 1017 CE Netherlands Direct +31207242562 [image: Booking.com] <https://www.booking.com/> The world's #1 accommodation site 43 languages, 198+ offices worldwide, 120,000+ global destinations, 1,550,000+ room nights booked every day No booking fees, best price always guaranteed Subsidiary of Booking Holdings Inc. (NASDAQ: BKNG)
[CaptureChangeMysql] Cannot get real value of primitive type when column type is unsigned.
Hello all, I recently started using nifi to read binlogs and ingest to various sources. I noticed that when the column type is unsigned, the underlaying library sends signed version of the value. (Eg, column type is unsigned tinyint, when the inserted value is 128, it sends -128). Its related to https://github.com/shyiko/mysql-binlog-connector-java/issues/104 . Has anyone come across this ? Is there any workaround to this other than patching CaptureChangeMysql processor ? Any help would be much appreciated. Ashwin
How to optimise use of MergeContent for large number of bins.
I have a nifi workflow to read from multiple mysql binlogs and put it to hdfs. I am using ChangeDataCapture as a source and PutHdfs as sink. I am using MergeContent processor in between to chunk the messages together for hdfs. *[CDC (Primary node only. About 200 of this processor for each db)] *-> *UpdateAttributes(db-table-hour) -> MergeContent 500 msg(bin based on db-table-hour) ->MergeContent 200 msg(bin based on db-table-hour) -> Put hdfs* I have about ~200 databases to read from, And ~2500 tables altogether. Update rate for binlogs is around 1Mbps per database. I am planning to run this on 3 node nifi cluster as of now. Has anyone used mergecontent with more than 2000 bins before? Does it scale well.? Can someone suggest me any improvements to the workflow or alternatives. Thanks Ashwin
Unable to operate nifi ui due to "java.net.SocketTimeoutException: Read timed out"
Hi, I am running 3 node nifi cluster (embedded zookeeper) with following flow. It pulls the data from mysql binlogs using CaptureChangeMysql and sending it to HDFS. CDC[Primary node only] -> outputPort -> RPG inputPort -> MergeContent -> PutHdfs I am using RPG to distribute load among other nodes in cluster. CDC reads large amount of data (~10 Mbps). But as soon as I start CDC processor, nifi ui hangs, and I get "javax.ws.rs.ProcessingException: java.net.SocketTimeoutException: Read timed out" , I have gone through various threads and tweaked certain properties like, nifi.cluster.node.connection.timeout = 30 seconds nifi.cluster.node.read.timeout = 30 sec nifi.cluster.node.protocol.threads = 20 nifi.web.jetty.threads = 400 Nothing unusual in the logs except data flow seems to be continuing. I am not able to figure out what is going wrong. Can someone please help me with it. Let me know what more information I need to provide in order to debug this. Stacktrace: 2018-09-25 12:00:49,888 ERROR [LearnerHandler-/10.10.10.10:31955] o.a.z.server.quorum.LearnerHandler Unexpected exception causing shutdown while sock still open java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63) at org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:83) at org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103) at org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:546) 2018-09-25 12:00:51,901 INFO [Process Cluster Protocol Request-7] o.a.n.c.p.impl.SocketProtocolListener Finished processing request e181bdbd-00c7-4f3b-b69a-34ff7c1c9cee (type=HEARTBEAT, length=3565 bytes) from nifiapp-1002:8080 in 1 millis Any help would be much appreciated. Ashwin
Slow flowfile transfer from process group port to output port.
Hi I have the following flow which I am trying to increase the throughput. I am runnign 3 node nifi cluster(v1.6) CDC -> processGroupOutputPort -> externalOutputPort RPG -> downstream Flowfiles are always queued up between processGroupOutputPort to externalOutputPort. I dont see any configuration to increase the number of threads for this operation as well. I have set backpressure to be 10,000 flowfiles. Since the CDC produces at much faster rate than this, I want to increase the throughput of the flow. Also I see in the UI externalOutputPort has number of threads running as 3 all the time no matter how much I increase number of threads to consume in RPG. Any pointers here will be much helpful. Ashwin
Re: Slow flowfile transfer from process group port to output port.
Hello Mike, Thanks a lot for looking at it. But I have already configured threads for remote port in RPG. But congestion is in the transfer between process group to external port. And there are no configuration option to increase number of threads in externalOutputPort. Number of running threads for output port is always showing as 3 in ui. So I am not sure how to increase the throughput now. I quickly went through implementation of LocalPort.java. Looks like flowflies are transferred at batches of 100. Can you give me any other suggestion to increase throughput ? -Ashwin On 2018/09/25 15:29:32, ashwin konale wrote: > Hi I have the following flow which I am trying to increase the throughput. > I am runnign 3 node nifi cluster(v1.6) > > CDC -> processGroupOutputPort -> externalOutputPort > RPG -> downstream > > Flowfiles are always queued up between processGroupOutputPort to > externalOutputPort. I dont see any configuration to increase the number of > threads for this operation as well. I have set backpressure to be 10,000 > flowfiles. Since the CDC produces at much faster rate than this, I want to > increase the throughput of the flow. Also I see in the UI > externalOutputPort has number of threads running as 3 all the time no > matter how much I increase number of threads to consume in RPG. > > Any pointers here will be much helpful. > > Ashwin >
Re: Slow flowfile transfer from process group port to output port.
On 2018/09/25 15:34:03, Mark Payne wrote: > Ashwin, > > You'll want to Right-Click on the RPG and then choose to configure Remote > Ports. From there, you can > configure how many threads should be used to pull data from each port. So > you've updated the Output Port > to use up to 3 threads per node to provide the data, but each node is still > only using 1 thread to pull the data. > By configuring the Remote Ports, you can configure the RPG to use 3 threads > to pull the data as well. > > Thanks > -Mark > > > > On Sep 25, 2018, at 11:29 AM, ashwin konale wrote: > > > > Hi I have the following flow which I am trying to increase the throughput. > > I am runnign 3 node nifi cluster(v1.6) > > > > CDC -> processGroupOutputPort -> externalOutputPort > > RPG -> downstream > > > > Flowfiles are always queued up between processGroupOutputPort to > > externalOutputPort. I dont see any configuration to increase the number of > > threads for this operation as well. I have set backpressure to be 10,000 > > flowfiles. Since the CDC produces at much faster rate than this, I want to > > increase the throughput of the flow. Also I see in the UI > > externalOutputPort has number of threads running as 3 all the time no > > matter how much I increase number of threads to consume in RPG. > > > > Any pointers here will be much helpful. > > > > Ashwin > > Hi Mark Thanks a lot for the reply. I have already configured RPG to use more threads on remote ports. But since nifi does not provide any option to increase number of threads on externalOutputPort, it by default runs on 3 threads. (I am not sure how it chose 3 threads.) So flowfiles are always blocked between processGroupOutputPort to externalOutputPort. I quickly went through sourcecode of LocalPort.java. Looks like it transfers in batches of 100. Can you provide some guidance here on how to improve the throughput ? Thanks - Ashwin
Re: Unable to operate nifi ui due to "java.net.SocketTimeoutException: Read timed out"
On 2018/09/25 13:13:12, Mark Payne wrote: > Hi Ashwin, > > The embedded ZooKeeper is provided as a convenience so that you can easily > test running things in a cluster > on a laptop, etc. However, it struggles when your nodes are handling any kind > of significant data rate. It is always > recommended that an external ZooKeeper be used for any sort of production use. > > Thanks > -Mark > > > On Sep 25, 2018, at 6:03 AM, ashwin konale wrote: > > > > Hi, > > > > I am running 3 node nifi cluster (embedded zookeeper) with following flow. > > It pulls the data from mysql binlogs using CaptureChangeMysql and sending > > it to HDFS. > > > > CDC[Primary node only] -> outputPort -> RPG > > > > inputPort -> MergeContent -> PutHdfs > > > > I am using RPG to distribute load among other nodes in cluster. CDC reads > > large amount of data (~10 Mbps). But as soon as I start CDC processor, nifi > > ui hangs, and I get "javax.ws.rs.ProcessingException: > > java.net.SocketTimeoutException: Read timed out" , I have gone through > > various threads and tweaked certain properties like, > > > > nifi.cluster.node.connection.timeout = 30 seconds > > nifi.cluster.node.read.timeout = 30 sec > > nifi.cluster.node.protocol.threads = 20 > > nifi.web.jetty.threads = 400 > > > > Nothing unusual in the logs except data flow seems to be continuing. I am > > not able to figure out what is going wrong. Can someone please help me with > > it. Let me know what more information I need to provide in order to debug > > this. > > > > Stacktrace: > > > > 2018-09-25 12:00:49,888 ERROR [LearnerHandler-/10.10.10.10:31955] > > o.a.z.server.quorum.LearnerHandler Unexpected exception causing shutdown > > while sock still open > > > > java.net.SocketTimeoutException: Read timed out > > > > at java.net.SocketInputStream.socketRead0(Native Method) > > > > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > > > > at java.net.SocketInputStream.read(SocketInputStream.java:171) > > > > at java.net.SocketInputStream.read(SocketInputStream.java:141) > > > > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > > > > at java.io.BufferedInputStream.read(BufferedInputStream.java:265) > > > > at java.io.DataInputStream.readInt(DataInputStream.java:387) > > > > at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63) > > > > at > > org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:83) > > > > at > > org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103) > > > > at > > org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:546) > > > > 2018-09-25 12:00:51,901 INFO [Process Cluster Protocol Request-7] > > o.a.n.c.p.impl.SocketProtocolListener Finished processing request > > e181bdbd-00c7-4f3b-b69a-34ff7c1c9cee (type=HEARTBEAT, length=3565 bytes) > > from nifiapp-1002:8080 in 1 millis > > > > > > Any help would be much appreciated. > > Ashwin > > Thanks Mark. Will try that
Re: Slow flowfile transfer from process group port to output port.
On 2018/09/25 15:34:03, Mark Payne wrote: > Ashwin, > > You'll want to Right-Click on the RPG and then choose to configure Remote > Ports. From there, you can > configure how many threads should be used to pull data from each port. So > you've updated the Output Port > to use up to 3 threads per node to provide the data, but each node is still > only using 1 thread to pull the data. > By configuring the Remote Ports, you can configure the RPG to use 3 threads > to pull the data as well. > > Thanks > -Mark > > > > On Sep 25, 2018, at 11:29 AM, ashwin konale wrote: > > > > Hi I have the following flow which I am trying to increase the throughput. > > I am runnign 3 node nifi cluster(v1.6) > > > > CDC -> processGroupOutputPort -> externalOutputPort > > RPG -> downstream > > > > Flowfiles are always queued up between processGroupOutputPort to > > externalOutputPort. I dont see any configuration to increase the number of > > threads for this operation as well. I have set backpressure to be 10,000 > > flowfiles. Since the CDC produces at much faster rate than this, I want to > > increase the throughput of the flow. Also I see in the UI > > externalOutputPort has number of threads running as 3 all the time no > > matter how much I increase number of threads to consume in RPG. > > > > Any pointers here will be much helpful. > > > > Ashwin > > Hello Mark, Thanks a lot for the reply. I have already configured number of threads to in remote ports in RPG. But no matter how many threads I set it to, the number of threads running in output port is showing as 3. And there is no setting to update the number of threads for the output port. I quickly went through LocalPort.java . (https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java) Looks like it transfers in the batches of 100. Is there any way to increase the throughput.?
Fixing unstable nifi cluster.
Hi, We have a 3 node nifi cluster (With separate zookeper instances running in the same machines) which pulls the data from mysql and write to hdfs. I am frequently running into problems with cluster. Nodes keeps disconnecting from each other, primary nodes keeps switching and sometimes it just goes into zombie state when I just cannot access the ui. I have followed best practices guide and tweaked params in nifi.properties, have switched provenanceRepositoryImplementation to volatile because cluster was not able to keep up with incoming traffic. Data traffic is not high at all (4Mbps). This is the message I frequently get from the logs. *INFO [main-EventThread] o.a.c.f.state.ConnectionStateManager State change: LOST* *INFO [Curator-ConnectionStateManager-0] o.a.n.c.l.e.CuratorLeaderElectionManager org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager$ElectionListener@56ebedec Connection State changed to LOST* *INFO [Curator-ConnectionStateManager-0] o.a.n.c.l.e.CuratorLeaderElectionManager org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager$ElectionListener@1b0e2055 Connection State changed to LOST* *INFO [main-EventThread] o.a.c.f.state.ConnectionStateManager State change: RECONNECTED* Am I doing something wrong with cluster setup ? Can someone give me some guidance on how to go about debugging this issue ? What kind of system metrics to look at etc. Ashwin
Scaling source processors in nifi horizontally.
Hi, I am experimenting with nifi for one of our usecases with plans of extending it to various other data routing, ingestion usecases. Right now I need to ingest data from mysql binlogs to hdfs/GCS. We have around 250 different schemas and about 3000 tables to read data from. Volume of the data flow ranges from 500 - 2000 messages per second in different schemas. Right now the problem is mysqlCDC processor can run in only one thread. To overcome this issue I have two options. 1. Use primary node execution, so different processors for each of the schemas. So eventually all processors which reads from mysql will run in single node, which will be a bottleneck no matter how big my nifi cluster is. 2. Another approach is to use multiple nifi instances to pull data and have master nifi cluster for ingestion to various sinks. In this approach I will have to manage all these small nifi instances, and may have to build some kind of tooling on top of it to monitor/provision new processor for newly added schemas etc. Is there any better way to achieve my usecase with nifi ? Please advice me on the architechture. Looking forward for suggestion. - Ashwin
Re: Scaling source processors in nifi horizontally.
Hi, The flow is like this, MysqlCDC -> UpdateAttributes -> MergeContent -> (PutHDFS, PutGCS) But we have around 250 schemas to pull data from, So with clustering setup, MysqlCDC_schema1 -> RPG MysqlCDC_schema2 -> RPG MysqlCDC_schema3 -> RPG and so on InputPort -> UpdateAttributes -> MergeContent -> (PutHDFS, PutGCS) But MysqlCDC can run only in primary node in the cluster, I will end up running all of input processors in single node. This can easily become bottleneck with increasing number of schemas we have. Could you suggest me any alternative approach to this problem. On 2018/10/17 21:14:09, Mike Thomsen wrote: > > may have to build some kind of tooling on top of it to monitor/provision> > new processor for newly added schemas etc.> > > Could you elaborate on this part of your use case?> > > On Wed, Oct 17, 2018 at 2:31 PM ashwin konale > > wrote:> > > > Hi,> > >> > > I am experimenting with nifi for one of our usecases with plans of> > > extending it to various other data routing, ingestion usecases. Right now > > I> > > need to ingest data from mysql binlogs to hdfs/GCS. We have around 250> > > different schemas and about 3000 tables to read data from. Volume of the> > > data flow ranges from 500 - 2000 messages per second in different schemas.> > >> > > Right now the problem is mysqlCDC processor can run in only one thread. To> > > overcome this issue I have two options.> > >> > > 1. Use primary node execution, so different processors for each of the> > > schemas. So eventually all processors which reads from mysql will run in> > > single node, which will be a bottleneck no matter how big my nifi cluster> > > is.> > >> > > 2. Another approach is to use multiple nifi instances to pull data and > > have> > > master nifi cluster for ingestion to various sinks. In this approach I > > will> > > have to manage all these small nifi instances, and may have to build some> > > kind of tooling on top of it to monitor/provision new processor for newly> > > added schemas etc.> > >> > > Is there any better way to achieve my usecase with nifi ? Please advice me> > > on the architechture.> > >> > > Looking forward for suggestion.> > >> > > - Ashwin> > >> >
Re: Scaling source processors in nifi horizontally.
Hi, The flow is like this, MysqlCDC -> UpdateAttributes -> MergeContent -> (PutHDFS, PutGCS) But we have around 250 schemas to pull data from, So with clustering setup, MysqlCDC_schema1 -> RPG MysqlCDC_schema2 -> RPG MysqlCDC_schema3 -> RPG and so on InputPort -> UpdateAttributes -> MergeContent -> (PutHDFS, PutGCS) But MysqlCDC can run only in primary node in the cluster, I will end up running all of input processors in single node. This can easily become bottleneck with increasing number of schemas we have. Could you suggest me any alternative approach to this problem. On 2018/10/17 21:14:09, Mike Thomsen wrote: > > may have to build some kind of tooling on top of it to monitor/provision> > new processor for newly added schemas etc.> > > Could you elaborate on this part of your use case?> > > On Wed, Oct 17, 2018 at 2:31 PM ashwin konale > > wrote:> > > > Hi,> > >> > > I am experimenting with nifi for one of our usecases with plans of> > > extending it to various other data routing, ingestion usecases. Right now I> > > need to ingest data from mysql binlogs to hdfs/GCS. We have around 250> > > different schemas and about 3000 tables to read data from. Volume of the> > > data flow ranges from 500 - 2000 messages per second in different schemas.> > >> > > Right now the problem is mysqlCDC processor can run in only one thread. To> > > overcome this issue I have two options.> > >> > > 1. Use primary node execution, so different processors for each of the> > > schemas. So eventually all processors which reads from mysql will run in> > > single node, which will be a bottleneck no matter how big my nifi cluster> > > is.> > >> > > 2. Another approach is to use multiple nifi instances to pull data and have> > > master nifi cluster for ingestion to various sinks. In this approach I will> > > have to manage all these small nifi instances, and may have to build some> > > kind of tooling on top of it to monitor/provision new processor for newly> > > added schemas etc.> > >> > > Is there any better way to achieve my usecase with nifi ? Please advice me> > > on the architechture.> > >> > > Looking forward for suggestion.> > >> > > - Ashwin> > >> >
Reading avro encoded message key from kafka
Hi, I have key-value pair of avro messages in kafka topic I want to consume from. I can easily do modifications on message value using nifi consumeKafkaRecord processor, but it doesnt show key of the message. ConsumeKafka processor has kafka.key attribute but I am not sure how to read its contents(Since it is avro encoded) and add certain values as attributes to flowfile. Any pointers will be much helpful. Thanks
Re: Reading avro encoded message key from kafka
Hi, Its not encoding issue. I am not able to figure out how to read Kafka key itself. eg. Kafka key = {type: foo, meta: etc, etc } Kafka message = {Avro Payload} I want to use RouteOnAttribute processor based on type = foo or bar. For this to happen, I need to extract value foo from Kafka.key to flow file. Basically I am not able to figure out how to read key and extract attributes from it from Kafka message in nifi. Could you suggest me something here. Thanks On 2018/11/23 15:14:53, Mike Thomsen wrote: > If you are having encoding-related issues with reading that attribute, try> > switching to the Kafka string serializer in your producer.> > > On Fri, Nov 23, 2018 at 10:12 AM ashwin konale > > wrote:> > > > Hi,> > > I have key-value pair of avro messages in kafka topic I want to consume> > > from. I can easily do modifications on message value using nifi> > > consumeKafkaRecord processor, but it doesnt show key of the message.> > > ConsumeKafka processor has kafka.key attribute but I am not sure how to> > > read its contents(Since it is avro encoded) and add certain values as> > > attributes to flowfile. Any pointers will be much helpful.> > >> > > Thanks> > >> > smime.p7s Description: S/MIME cryptographic signature
Re: Reading avro encoded message key from kafka
Hi, Thanks a lot for the suggestion. I didn’t know about jsonpath EL functions. I can easily implement that in my use case. - Ashwin On 2018/11/27 18:52:05, Bryan Bende wrote: > Unfortunately I don't think there is a good way to interpret the value> > of the key when it is Avro because we don't have any expression> > language functions that understand Avro or record-oriented values.> > > The main option would be to change how the data is being produced in some > way...> > > - Put the value you are interested in in a message header, then it> > will come across as a string key/value pair in a flow file attribute> > and use RouteOnAttrbute> > - Put the value you are interested in in the message body somewhere,> > use PartitionRecord to route on the value of the field in the message> > - Use a different kind of key serialization like json which can then> > be parsed with expression language functions> > > A possible improvement we could make is to add some kind of> > "avro-to-json" EL function, then from there use the EL jsonPath> > function.> > > On Tue, Nov 27, 2018 at 1:01 PM Ashwin Konale> > wrote:> > >> > > Hi,> > > Its not encoding issue. I am not able to figure out how to read Kafka key > > itself.> > > eg.> > > Kafka key = {type: foo, meta: etc, etc }> > > Kafka message = {Avro Payload}> > >> > > I want to use RouteOnAttribute processor based on type = foo or bar. For > > this to happen, I need to extract value foo from Kafka.key to flow file. > > Basically I am not able to figure out how to read key and extract > > attributes from it from Kafka message in nifi. Could you suggest me > > something here.> > >> > > Thanks> > >> > >> > > On 2018/11/23 15:14:53, Mike Thomsen wrote:> > > > If you are having encoding-related issues with reading that attribute, > > > try>> > > > switching to the Kafka string serializer in your producer.>> > > >> > > > On Fri, Nov 23, 2018 at 10:12 AM ashwin konale >> > > > wrote:>> > > >> > > > > Hi,>> > > > > I have key-value pair of avro messages in kafka topic I want to > > > > consume>> > > > > from. I can easily do modifications on message value using nifi>> > > > > consumeKafkaRecord processor, but it doesnt show key of the message.>> > > > > ConsumeKafka processor has kafka.key attribute but I am not sure how > > > > to>> > > > > read its contents(Since it is avro encoded) and add certain values as>> > > > > attributes to flowfile. Any pointers will be much helpful.>> > > > >>> > > > > Thanks>> > > > >>> > > >> > smime.p7s Description: S/MIME cryptographic signature