Chris

Sorry for getting late to this

This indeed was intentional, primarily to follow Kafka’s best practices where 
Kafka was not designed to be a general data transfer system of arbitrary size 
but rather “manageable” size. Also, as you know we have ability to demarcate 
messages essentially allowing you to send a very large FlowFile that will be 
parsed into chunks where each chunk will end up to be a separate Kafka message.
That said, we did consider that at some point we may expose it as configuration 
property and it seems like the time has come.

Cheers
Oleg

> On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU - 
> STaTS/StorefrontRemote) <[email protected]> wrote:
> 
> Jira is https://issues.apache.org/jira/browse/NIFI-2614.
> 
> Thanks,
> 
> Chris McDermott
> 
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
> 
> 
> 
> On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" 
> <[email protected]> wrote:
> 
>    I’ll raise a JIRA, Joe.
> 
>    Thanks,
> 
>    Chris McDermott
> 
>    Remote Business Analytics
>    STaTS/StoreFront Remote
>    HPE Storage
>    Hewlett Packard Enterprise
>    Mobile: +1 978-697-5315
> 
> 
> 
>    On 8/20/16, 6:52 PM, "Joe Witt" <[email protected]> wrote:
> 
>        If no jira is raised sooner I'll raise one and get it sorted.
> 
>        On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <[email protected]> 
> wrote:
> 
>> Hi Chris,
>> Sorry for not catching that code path. I am not sure if it is actually a
>> regression as I took a look at the 1.0.0-BETA code and it matches the
>> 0.7.0, specifically this comment block:
>> 
>> /*
>> * We're using the default value from Kafka. We are using it to control the
>> * message size before it goes to to Kafka thus limiting possibility of a
>> * late failures in Kafka client.
>> */
>> 
>> found at[1] leads me to believe it was intentional and not a regression.
>> Looking at the 0.6.1 release code it appears that PutKafka used a default
>> of 5 MB [2].
>> 
>> I can speculate on the reasoning behind it, however, I will refrain from
>> opining on it as I was not involved in any of the conversations related to
>> the change and enforcement of the 1 MB max.
>> 
>> [1]
>> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
>> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
>> processors/src/main/java/org/apache/nifi/processors/kafka/
>> PublishingContext.java#L36-L41
>> [2]
>> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
>> 
>> Thanks,
>> Andrew
>> 
>> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
>> STaTS/StorefrontRemote) <[email protected]> wrote:
>> 
>>> Thanks, Andrew.
>>> 
>>> I’ve set all of the right broker configs to allow larger messages.
>>> Believe me I spent a lot of time banging my head against the wall
>> thinking
>>> that the broker and topic configs were wrong.
>>> 
>>> PublisingKafka uses PublishingContext.  That class has bean property
>>> called maxRequestSize, which defaults to 1048576.  As far as I can tell
>> the
>>> setMaxRequestSize() method is never called (except by some test code.)
>>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
>>> passes the result to the constructor for StreamDemarcator.   The publish
>>> method then calls the StreamDemarcator. getNextToken(), which in turns
>>> calls StreamDemarcator.fill() which compares the stream position against
>>> the maxRequestSize and throws the exception with this line.
>>> 
>>> throw new IllegalStateException("Maximum allowed data size of " +
>>> this.maxDataSize + " exceeded.");
>>> 
>>> Which matches what I see in the nifi-app.log file…
>>> 
>>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
>>> o.apache.nifi.processors.kafka.PutKafka
>>> java.lang.IllegalStateException: Maximum allowed data size of 1048576
>>> exceeded.
>>>        at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
>> StreamDemarcator.java:153)
>>> ~[nifi-utils-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.stream.io.util.StreamDemarcator.
>>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
>> KafkaPublisher.java:129)
>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.PutKafka$1.process(
>> PutKafka.java:315)
>>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.controller.repository.
>>> StandardProcessSession.read(StandardProcessSession.java:1851)
>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.controller.repository.
>>> StandardProcessSession.read(StandardProcessSession.java:1822)
>>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.PutKafka.
>>> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.PutKafka.
>>> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
>>> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
>>>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
>>> jar:0.7.0]
>>>        at org.apache.nifi.controller.scheduling.
>>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
>>> [nifi-framework-core-0.7.0.jar:0.7.0]
>>>        at java.util.concurrent.Executors$RunnableAdapter.
>> call(Executors.java:511)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.FutureTask.runAndReset(
>> FutureTask.java:308)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>>> [na:1.8.0_45]
>>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>>> [na:1.8.0_45]
>>>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>>> 
>>> This occurs using PublishKafka, and PutKafka.  Setting the Max Record
>> Size
>>> property in the PutKafka processor has no affect on this.  Note the stack
>>> trace above is from the PutKafka processor with Max Record Size set to
>> 10MB.
>>> 
>>> I believe that this a regression from 0.6.0.
>>> 
>>> Chris McDermott
>>> 
>>> Remote Business Analytics
>>> STaTS/StoreFront Remote
>>> HPE Storage
>>> Hewlett Packard Enterprise
>>> Mobile: +1 978-697-5315
>>> 
>>> 
>>> 
>>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <[email protected]> wrote:
>>> 
>>>    Hi Chris,
>>>    Regarding the PutKafka processor looking at this block[1] of the
>>> PutKafka
>>>    code, it has a default size of 1 MB, but it does not restrict the
>>> size. The
>>>    DATA_SIZE_VALIDATOR does a sanity check and also enforces that
>>>    the supported value entered is the correct format <value> [B|
>>> KB|MB|GB|TB].
>>>    Later on in the code at this block[2], the value is set on the Kafka
>>>    config, again this does not enforce a value maximum.
>>> 
>>>    In regards to the PublishKafka processor I do not see where it
>> accepts
>>> a
>>>    size nor restrict the size at all.
>>> 
>>>    Have you adjusted the 'message.max.bytes' config value for your
>>> broker(s)?
>>>    The default value for that is 1 MB [3] (The url references the 0.8
>>> Kafka,
>>>    however I believe this default has been stable since the early days
>> of
>>> the
>>>    project.)
>>> 
>>>    If you really do want to send messages that are larger than 1 MB in
>>> size, I
>>>    would highly recommending reading this post[4] from Gwen Shapira.  It
>>> does
>>>    a great job of outlining the things you need to take into
>>> consideration.
>>>    This will also point you to the relevant configs in Kafka that will
>>> need to
>>>    be adjusted if you decide to go this route.
>>> 
>>> 
>>>    Thanks,
>>>    Andrew
>>> 
>>>    [1]
>>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
>>>    [2]
>>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>>>    [3] https://kafka.apache.org/08/configuration.html
>>>    [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
>>> 
>>>    On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
>>>    STaTS/StorefrontRemote) <[email protected]> wrote:
>>> 
>>>> Hi folks,
>>>> 
>>>> 
>>>> 
>>>> From experimentation and looking at the code it seems that the max
>>> message
>>>> size that can be sent via the PublishKafka and PutKafka processors
>>> in 0.7.0
>>>> is 1MB.  Can someone please confirm my read on this?
>>>> 
>>>> 
>>>> 
>>>> Thanks,
>>>> 
>>>> 
>>>> 
>>>> Chris McDermott
>>>> 
>>>> 
>>>> 
>>>> Remote Business Analytics
>>>> 
>>>> STaTS/StoreFront Remote
>>>> 
>>>> HPE Storage
>>>> 
>>>> Hewlett Packard Enterprise
>>>> 
>>>> Mobile: +1 978-697-5315
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>>    --
>>>    Thanks,
>>>    Andrew
>>> 
>>>    Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>>>    <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>>>    twiiter: @itmdata <http://twitter.com/intent/
>> user?screen_name=itmdata>
>>> 
>>> 
>>> 
>> 
>> 
>> --
>> Thanks,
>> Andrew
>> 
>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
>> 
> 
> 
> 
> 

  • Max Kafka message s... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
    • Re: Max Kafka ... Andrew Psaltis
      • Re: Max Ka... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
        • Re: Ma... Andrew Psaltis
          • Re... Joe Witt
            • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
              • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)

Reply via email to