Exactly what you describe based off topic metadata is what we should do. Am looking into this with another issue at moment.
On Aug 21, 2016 6:15 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <[email protected]> wrote: > Thanks for getting back, Oleg. I’d be happy to send demarcated messages; > the problem is reassembling them at the other end of the pipe. I’ve done a > lot of searching for techniques to do this. They all seem to have major > draw backs in terms of reliable message delivery or in terms of garbage > collection (not in the sense of Java GC, but cleanup of files bounced off > of a shared file system.) The nice thing about Kafka is its atomic, it has > replicated delivery, and guaranteed GC semantics. My use case has fairly > low throughput requirements (thousands, not millions of TPM) where most > messages are fairly small but a few are larger. > > It would be nice if the Kafka client could learn the max message size from > Kafka itself by querying the max.message.bytes on the topic, rather than > have the flow designer be required to set it on the producer Processors. > For now, though I’d be happy going back to the old behavior where its set > on the producer Processors. On the flip side I am also concerned that the > clients (GetKafka and ConsumeKafka) do not expose a max message parameter. > That will be equally problematic. > > Anyway, enough of my blathering. > > Yours and the communities help is greatly appreciated. > > Thanks, > > Chris McDermott > > Remote Business Analytics > STaTS/StoreFront Remote > HPE Storage > Hewlett Packard Enterprise > Mobile: +1 978-697-5315 > > > > On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <[email protected]> > wrote: > > Chris > > Sorry for getting late to this > > This indeed was intentional, primarily to follow Kafka’s best > practices where Kafka was not designed to be a general data transfer system > of arbitrary size but rather “manageable” size. Also, as you know we have > ability to demarcate messages essentially allowing you to send a very large > FlowFile that will be parsed into chunks where each chunk will end up to be > a separate Kafka message. > That said, we did consider that at some point we may expose it as > configuration property and it seems like the time has come. > > Cheers > Oleg > > > On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote) <[email protected]> wrote: > > > > Jira is https://issues.apache.org/jira/browse/NIFI-2614. > > > > Thanks, > > > > Chris McDermott > > > > Remote Business Analytics > > STaTS/StoreFront Remote > > HPE Storage > > Hewlett Packard Enterprise > > Mobile: +1 978-697-5315 > > > > > > > > On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote)" <[email protected]> wrote: > > > > I’ll raise a JIRA, Joe. > > > > Thanks, > > > > Chris McDermott > > > > Remote Business Analytics > > STaTS/StoreFront Remote > > HPE Storage > > Hewlett Packard Enterprise > > Mobile: +1 978-697-5315 > > > > > > > > On 8/20/16, 6:52 PM, "Joe Witt" <[email protected]> wrote: > > > > If no jira is raised sooner I'll raise one and get it sorted. > > > > On Aug 20, 2016 6:40 PM, "Andrew Psaltis" < > [email protected]> wrote: > > > >> Hi Chris, > >> Sorry for not catching that code path. I am not sure if it is > actually a > >> regression as I took a look at the 1.0.0-BETA code and it matches > the > >> 0.7.0, specifically this comment block: > >> > >> /* > >> * We're using the default value from Kafka. We are using it to > control the > >> * message size before it goes to to Kafka thus limiting possibility > of a > >> * late failures in Kafka client. > >> */ > >> > >> found at[1] leads me to believe it was intentional and not a > regression. > >> Looking at the 0.6.1 release code it appears that PutKafka used a > default > >> of 5 MB [2]. > >> > >> I can speculate on the reasoning behind it, however, I will refrain > from > >> opining on it as I was not involved in any of the conversations > related to > >> the change and enforcement of the 1 MB max. > >> > >> [1] > >> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA- > >> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka- > >> processors/src/main/java/org/apache/nifi/processors/kafka/ > >> PublishingContext.java#L36-L41 > >> [2] > >> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi- > >> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > >> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176 > >> > >> Thanks, > >> Andrew > >> > >> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU - > >> STaTS/StorefrontRemote) <[email protected]> wrote: > >> > >>> Thanks, Andrew. > >>> > >>> I’ve set all of the right broker configs to allow larger messages. > >>> Believe me I spent a lot of time banging my head against the wall > >> thinking > >>> that the broker and topic configs were wrong. > >>> > >>> PublisingKafka uses PublishingContext. That class has bean > property > >>> called maxRequestSize, which defaults to 1048576. As far as I can > tell > >> the > >>> setMaxRequestSize() method is never called (except by some test > code.) > >>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() > and > >>> passes the result to the constructor for StreamDemarcator. The > publish > >>> method then calls the StreamDemarcator. getNextToken(), which in > turns > >>> calls StreamDemarcator.fill() which compares the stream position > against > >>> the maxRequestSize and throws the exception with this line. > >>> > >>> throw new IllegalStateException("Maximum allowed data size of " + > >>> this.maxDataSize + " exceeded."); > >>> > >>> Which matches what I see in the nifi-app.log file… > >>> > >>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8] > >>> o.apache.nifi.processors.kafka.PutKafka > >>> java.lang.IllegalStateException: Maximum allowed data size of > 1048576 > >>> exceeded. > >>> at org.apache.nifi.stream.io.util.StreamDemarcator.fill( > >> StreamDemarcator.java:153) > >>> ~[nifi-utils-0.7.0.jar:0.7.0] > >>> at org.apache.nifi.stream.io.util.StreamDemarcator. > >>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0] > >>> at org.apache.nifi.processors.kafka.KafkaPublisher.publish( > >> KafkaPublisher.java:129) > >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0] > >>> at org.apache.nifi.processors.kafka.PutKafka$1.process( > >> PutKafka.java:315) > >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0] > >>> at org.apache.nifi.controller.repository. > >>> StandardProcessSession.read(StandardProcessSession.java:1851) > >>> ~[nifi-framework-core-0.7.0.jar:0.7.0] > >>> at org.apache.nifi.controller.repository. > >>> StandardProcessSession.read(StandardProcessSession.java:1822) > >>> ~[nifi-framework-core-0.7.0.jar:0.7.0] > >>> at org.apache.nifi.processors.kafka.PutKafka. > >>> doRendezvousWithKafka(PutKafka.java:311) > ~[nifi-kafka-processors-0.7.0. > >>> jar:0.7.0] > >>> at org.apache.nifi.processors.kafka.PutKafka. > >>> rendezvousWithKafka(PutKafka.java:287) > ~[nifi-kafka-processors-0.7.0. > >>> jar:0.7.0] > >>> at org.apache.nifi.processors.kafka.AbstractKafkaProcessor. > >>> onTrigger(AbstractKafkaProcessor.java:76) > ~[nifi-kafka-processors-0.7.0. > >>> jar:0.7.0] > >>> at org.apache.nifi.controller.StandardProcessorNode. > onTrigger( > >>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0. > jar:0.7.0] > >>> at org.apache.nifi.controller.tasks. > ContinuallyRunProcessorTask. > >>> call(ContinuallyRunProcessorTask.java:136) > [nifi-framework-core-0.7.0. > >>> jar:0.7.0] > >>> at org.apache.nifi.controller.tasks. > ContinuallyRunProcessorTask. > >>> call(ContinuallyRunProcessorTask.java:47) > [nifi-framework-core-0.7.0. > >>> jar:0.7.0] > >>> at org.apache.nifi.controller.scheduling. > >>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent. > java:127) > >>> [nifi-framework-core-0.7.0.jar:0.7.0] > >>> at java.util.concurrent.Executors$RunnableAdapter. > >> call(Executors.java:511) > >>> [na:1.8.0_45] > >>> at java.util.concurrent.FutureTask.runAndReset( > >> FutureTask.java:308) > >>> [na:1.8.0_45] > >>> at java.util.concurrent.ScheduledThreadPoolExecutor$ > >>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor. > java:180) > >>> [na:1.8.0_45] > >>> at java.util.concurrent.ScheduledThreadPoolExecutor$ > >>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > >>> [na:1.8.0_45] > >>> at java.util.concurrent.ThreadPoolExecutor.runWorker( > >> ThreadPoolExecutor.java:1142) > >>> [na:1.8.0_45] > >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( > >> ThreadPoolExecutor.java:617) > >>> [na:1.8.0_45] > >>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] > >>> > >>> This occurs using PublishKafka, and PutKafka. Setting the Max > Record > >> Size > >>> property in the PutKafka processor has no affect on this. Note > the stack > >>> trace above is from the PutKafka processor with Max Record Size > set to > >> 10MB. > >>> > >>> I believe that this a regression from 0.6.0. > >>> > >>> Chris McDermott > >>> > >>> Remote Business Analytics > >>> STaTS/StoreFront Remote > >>> HPE Storage > >>> Hewlett Packard Enterprise > >>> Mobile: +1 978-697-5315 > >>> > >>> > >>> > >>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <[email protected]> > wrote: > >>> > >>> Hi Chris, > >>> Regarding the PutKafka processor looking at this block[1] of the > >>> PutKafka > >>> code, it has a default size of 1 MB, but it does not restrict > the > >>> size. The > >>> DATA_SIZE_VALIDATOR does a sanity check and also enforces that > >>> the supported value entered is the correct format <value> [B| > >>> KB|MB|GB|TB]. > >>> Later on in the code at this block[2], the value is set on the > Kafka > >>> config, again this does not enforce a value maximum. > >>> > >>> In regards to the PublishKafka processor I do not see where it > >> accepts > >>> a > >>> size nor restrict the size at all. > >>> > >>> Have you adjusted the 'message.max.bytes' config value for your > >>> broker(s)? > >>> The default value for that is 1 MB [3] (The url references the > 0.8 > >>> Kafka, > >>> however I believe this default has been stable since the early > days > >> of > >>> the > >>> project.) > >>> > >>> If you really do want to send messages that are larger than 1 > MB in > >>> size, I > >>> would highly recommending reading this post[4] from Gwen > Shapira. It > >>> does > >>> a great job of outlining the things you need to take into > >>> consideration. > >>> This will also point you to the relevant configs in Kafka that > will > >>> need to > >>> be adjusted if you decide to go this route. > >>> > >>> > >>> Thanks, > >>> Andrew > >>> > >>> [1] > >>> https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- > >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180 > >>> [2] > >>> https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- > >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495 > >>> [3] https://kafka.apache.org/08/configuration.html > >>> [4] http://ingest.tips/2015/01/21/ > handling-large-messages-kafka/ > >>> > >>> On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU - > >>> STaTS/StorefrontRemote) <[email protected]> wrote: > >>> > >>>> Hi folks, > >>>> > >>>> > >>>> > >>>> From experimentation and looking at the code it seems that the max > >>> message > >>>> size that can be sent via the PublishKafka and PutKafka processors > >>> in 0.7.0 > >>>> is 1MB. Can someone please confirm my read on this? > >>>> > >>>> > >>>> > >>>> Thanks, > >>>> > >>>> > >>>> > >>>> Chris McDermott > >>>> > >>>> > >>>> > >>>> Remote Business Analytics > >>>> > >>>> STaTS/StoreFront Remote > >>>> > >>>> HPE Storage > >>>> > >>>> Hewlett Packard Enterprise > >>>> > >>>> Mobile: +1 978-697-5315 > >>>> > >>>> > >>>> > >>>> > >>> > >>> > >>> -- > >>> Thanks, > >>> Andrew > >>> > >>> Subscribe to my book: Streaming Data < > http://manning.com/psaltis> > >>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > >>> twiiter: @itmdata <http://twitter.com/intent/ > >> user?screen_name=itmdata> > >>> > >>> > >>> > >> > >> > >> -- > >> Thanks, > >> Andrew > >> > >> Subscribe to my book: Streaming Data <http://manning.com/psaltis> > >> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > >> twiiter: @itmdata <http://twitter.com/intent/ > user?screen_name=itmdata> > >> > > > > > > > > > > > >
