Chris Sorry for getting late to this
This indeed was intentional, primarily to follow Kafka’s best practices where Kafka was not designed to be a general data transfer system of arbitrary size but rather “manageable” size. Also, as you know we have ability to demarcate messages essentially allowing you to send a very large FlowFile that will be parsed into chunks where each chunk will end up to be a separate Kafka message. That said, we did consider that at some point we may expose it as configuration property and it seems like the time has come. Cheers Oleg > On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote) <[email protected]> wrote: > > Jira is https://issues.apache.org/jira/browse/NIFI-2614. > > Thanks, > > Chris McDermott > > Remote Business Analytics > STaTS/StoreFront Remote > HPE Storage > Hewlett Packard Enterprise > Mobile: +1 978-697-5315 > > > > On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" > <[email protected]> wrote: > > I’ll raise a JIRA, Joe. > > Thanks, > > Chris McDermott > > Remote Business Analytics > STaTS/StoreFront Remote > HPE Storage > Hewlett Packard Enterprise > Mobile: +1 978-697-5315 > > > > On 8/20/16, 6:52 PM, "Joe Witt" <[email protected]> wrote: > > If no jira is raised sooner I'll raise one and get it sorted. > > On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <[email protected]> > wrote: > >> Hi Chris, >> Sorry for not catching that code path. I am not sure if it is actually a >> regression as I took a look at the 1.0.0-BETA code and it matches the >> 0.7.0, specifically this comment block: >> >> /* >> * We're using the default value from Kafka. We are using it to control the >> * message size before it goes to to Kafka thus limiting possibility of a >> * late failures in Kafka client. >> */ >> >> found at[1] leads me to believe it was intentional and not a regression. >> Looking at the 0.6.1 release code it appears that PutKafka used a default >> of 5 MB [2]. >> >> I can speculate on the reasoning behind it, however, I will refrain from >> opining on it as I was not involved in any of the conversations related to >> the change and enforcement of the 1 MB max. >> >> [1] >> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA- >> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka- >> processors/src/main/java/org/apache/nifi/processors/kafka/ >> PublishingContext.java#L36-L41 >> [2] >> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi- >> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ >> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176 >> >> Thanks, >> Andrew >> >> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU - >> STaTS/StorefrontRemote) <[email protected]> wrote: >> >>> Thanks, Andrew. >>> >>> I’ve set all of the right broker configs to allow larger messages. >>> Believe me I spent a lot of time banging my head against the wall >> thinking >>> that the broker and topic configs were wrong. >>> >>> PublisingKafka uses PublishingContext. That class has bean property >>> called maxRequestSize, which defaults to 1048576. As far as I can tell >> the >>> setMaxRequestSize() method is never called (except by some test code.) >>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and >>> passes the result to the constructor for StreamDemarcator. The publish >>> method then calls the StreamDemarcator. getNextToken(), which in turns >>> calls StreamDemarcator.fill() which compares the stream position against >>> the maxRequestSize and throws the exception with this line. >>> >>> throw new IllegalStateException("Maximum allowed data size of " + >>> this.maxDataSize + " exceeded."); >>> >>> Which matches what I see in the nifi-app.log file… >>> >>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8] >>> o.apache.nifi.processors.kafka.PutKafka >>> java.lang.IllegalStateException: Maximum allowed data size of 1048576 >>> exceeded. >>> at org.apache.nifi.stream.io.util.StreamDemarcator.fill( >> StreamDemarcator.java:153) >>> ~[nifi-utils-0.7.0.jar:0.7.0] >>> at org.apache.nifi.stream.io.util.StreamDemarcator. >>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0] >>> at org.apache.nifi.processors.kafka.KafkaPublisher.publish( >> KafkaPublisher.java:129) >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0] >>> at org.apache.nifi.processors.kafka.PutKafka$1.process( >> PutKafka.java:315) >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0] >>> at org.apache.nifi.controller.repository. >>> StandardProcessSession.read(StandardProcessSession.java:1851) >>> ~[nifi-framework-core-0.7.0.jar:0.7.0] >>> at org.apache.nifi.controller.repository. >>> StandardProcessSession.read(StandardProcessSession.java:1822) >>> ~[nifi-framework-core-0.7.0.jar:0.7.0] >>> at org.apache.nifi.processors.kafka.PutKafka. >>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0. >>> jar:0.7.0] >>> at org.apache.nifi.processors.kafka.PutKafka. >>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0. >>> jar:0.7.0] >>> at org.apache.nifi.processors.kafka.AbstractKafkaProcessor. >>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0. >>> jar:0.7.0] >>> at org.apache.nifi.controller.StandardProcessorNode.onTrigger( >>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0] >>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. >>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0. >>> jar:0.7.0] >>> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. >>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0. >>> jar:0.7.0] >>> at org.apache.nifi.controller.scheduling. >>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127) >>> [nifi-framework-core-0.7.0.jar:0.7.0] >>> at java.util.concurrent.Executors$RunnableAdapter. >> call(Executors.java:511) >>> [na:1.8.0_45] >>> at java.util.concurrent.FutureTask.runAndReset( >> FutureTask.java:308) >>> [na:1.8.0_45] >>> at java.util.concurrent.ScheduledThreadPoolExecutor$ >>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>> [na:1.8.0_45] >>> at java.util.concurrent.ScheduledThreadPoolExecutor$ >>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>> [na:1.8.0_45] >>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1142) >>> [na:1.8.0_45] >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> ThreadPoolExecutor.java:617) >>> [na:1.8.0_45] >>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] >>> >>> This occurs using PublishKafka, and PutKafka. Setting the Max Record >> Size >>> property in the PutKafka processor has no affect on this. Note the stack >>> trace above is from the PutKafka processor with Max Record Size set to >> 10MB. >>> >>> I believe that this a regression from 0.6.0. >>> >>> Chris McDermott >>> >>> Remote Business Analytics >>> STaTS/StoreFront Remote >>> HPE Storage >>> Hewlett Packard Enterprise >>> Mobile: +1 978-697-5315 >>> >>> >>> >>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <[email protected]> wrote: >>> >>> Hi Chris, >>> Regarding the PutKafka processor looking at this block[1] of the >>> PutKafka >>> code, it has a default size of 1 MB, but it does not restrict the >>> size. The >>> DATA_SIZE_VALIDATOR does a sanity check and also enforces that >>> the supported value entered is the correct format <value> [B| >>> KB|MB|GB|TB]. >>> Later on in the code at this block[2], the value is set on the Kafka >>> config, again this does not enforce a value maximum. >>> >>> In regards to the PublishKafka processor I do not see where it >> accepts >>> a >>> size nor restrict the size at all. >>> >>> Have you adjusted the 'message.max.bytes' config value for your >>> broker(s)? >>> The default value for that is 1 MB [3] (The url references the 0.8 >>> Kafka, >>> however I believe this default has been stable since the early days >> of >>> the >>> project.) >>> >>> If you really do want to send messages that are larger than 1 MB in >>> size, I >>> would highly recommending reading this post[4] from Gwen Shapira. It >>> does >>> a great job of outlining the things you need to take into >>> consideration. >>> This will also point you to the relevant configs in Kafka that will >>> need to >>> be adjusted if you decide to go this route. >>> >>> >>> Thanks, >>> Andrew >>> >>> [1] >>> https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180 >>> [2] >>> https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495 >>> [3] https://kafka.apache.org/08/configuration.html >>> [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/ >>> >>> On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU - >>> STaTS/StorefrontRemote) <[email protected]> wrote: >>> >>>> Hi folks, >>>> >>>> >>>> >>>> From experimentation and looking at the code it seems that the max >>> message >>>> size that can be sent via the PublishKafka and PutKafka processors >>> in 0.7.0 >>>> is 1MB. Can someone please confirm my read on this? >>>> >>>> >>>> >>>> Thanks, >>>> >>>> >>>> >>>> Chris McDermott >>>> >>>> >>>> >>>> Remote Business Analytics >>>> >>>> STaTS/StoreFront Remote >>>> >>>> HPE Storage >>>> >>>> Hewlett Packard Enterprise >>>> >>>> Mobile: +1 978-697-5315 >>>> >>>> >>>> >>>> >>> >>> >>> -- >>> Thanks, >>> Andrew >>> >>> Subscribe to my book: Streaming Data <http://manning.com/psaltis> >>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> >>> twiiter: @itmdata <http://twitter.com/intent/ >> user?screen_name=itmdata> >>> >>> >>> >> >> >> -- >> Thanks, >> Andrew >> >> Subscribe to my book: Streaming Data <http://manning.com/psaltis> >> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> >> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata> >> > > > >
