Exactly what you describe based off topic metadata is what we should do.
Am looking into this with another issue at moment.

On Aug 21, 2016 6:15 PM, "McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote)" <[email protected]> wrote:

> Thanks for getting back, Oleg. I’d be happy to send demarcated messages;
> the problem is reassembling them at the other end of the pipe.  I’ve done a
> lot of searching for techniques to do this.  They all seem to have major
> draw backs in terms of reliable message delivery or in terms of garbage
> collection (not in the sense of Java GC, but cleanup of files bounced off
> of a shared file system.) The nice thing about Kafka is its atomic, it has
> replicated delivery, and guaranteed GC semantics.   My use case has fairly
> low throughput requirements (thousands, not millions of TPM) where most
> messages are fairly small but a few are larger.
>
> It would be nice if the Kafka client could learn the max message size from
> Kafka itself by querying the max.message.bytes on the topic, rather than
> have the flow designer be required to set it on the producer Processors.
> For now, though I’d be happy going back to the old behavior where its set
> on the producer Processors.  On the flip side I am also concerned that the
> clients (GetKafka and ConsumeKafka) do not expose a max message parameter.
> That will be equally problematic.
>
> Anyway, enough of my blathering.
>
> Yours and the communities help is greatly appreciated.
>
> Thanks,
>
> Chris McDermott
>
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
>
>
>
> On 8/21/16, 3:43 PM, "Oleg Zhurakousky" <[email protected]>
> wrote:
>
>     Chris
>
>     Sorry for getting late to this
>
>     This indeed was intentional, primarily to follow Kafka’s best
> practices where Kafka was not designed to be a general data transfer system
> of arbitrary size but rather “manageable” size. Also, as you know we have
> ability to demarcate messages essentially allowing you to send a very large
> FlowFile that will be parsed into chunks where each chunk will end up to be
> a separate Kafka message.
>     That said, we did consider that at some point we may expose it as
> configuration property and it seems like the time has come.
>
>     Cheers
>     Oleg
>
>     > On Aug 20, 2016, at 7:03 PM, McDermott, Chris Kevin (MSDU -
> STaTS/StorefrontRemote) <[email protected]> wrote:
>     >
>     > Jira is https://issues.apache.org/jira/browse/NIFI-2614.
>     >
>     > Thanks,
>     >
>     > Chris McDermott
>     >
>     > Remote Business Analytics
>     > STaTS/StoreFront Remote
>     > HPE Storage
>     > Hewlett Packard Enterprise
>     > Mobile: +1 978-697-5315
>     >
>     >
>     >
>     > On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU -
> STaTS/StorefrontRemote)" <[email protected]> wrote:
>     >
>     >    I’ll raise a JIRA, Joe.
>     >
>     >    Thanks,
>     >
>     >    Chris McDermott
>     >
>     >    Remote Business Analytics
>     >    STaTS/StoreFront Remote
>     >    HPE Storage
>     >    Hewlett Packard Enterprise
>     >    Mobile: +1 978-697-5315
>     >
>     >
>     >
>     >    On 8/20/16, 6:52 PM, "Joe Witt" <[email protected]> wrote:
>     >
>     >        If no jira is raised sooner I'll raise one and get it sorted.
>     >
>     >        On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <
> [email protected]> wrote:
>     >
>     >> Hi Chris,
>     >> Sorry for not catching that code path. I am not sure if it is
> actually a
>     >> regression as I took a look at the 1.0.0-BETA code and it matches
> the
>     >> 0.7.0, specifically this comment block:
>     >>
>     >> /*
>     >> * We're using the default value from Kafka. We are using it to
> control the
>     >> * message size before it goes to to Kafka thus limiting possibility
> of a
>     >> * late failures in Kafka client.
>     >> */
>     >>
>     >> found at[1] leads me to believe it was intentional and not a
> regression.
>     >> Looking at the 0.6.1 release code it appears that PutKafka used a
> default
>     >> of 5 MB [2].
>     >>
>     >> I can speculate on the reasoning behind it, however, I will refrain
> from
>     >> opining on it as I was not involved in any of the conversations
> related to
>     >> the change and enforcement of the 1 MB max.
>     >>
>     >> [1]
>     >> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
>     >> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
>     >> processors/src/main/java/org/apache/nifi/processors/kafka/
>     >> PublishingContext.java#L36-L41
>     >> [2]
>     >> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
>     >> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>     >> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
>     >>
>     >> Thanks,
>     >> Andrew
>     >>
>     >> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
>     >> STaTS/StorefrontRemote) <[email protected]> wrote:
>     >>
>     >>> Thanks, Andrew.
>     >>>
>     >>> I’ve set all of the right broker configs to allow larger messages.
>     >>> Believe me I spent a lot of time banging my head against the wall
>     >> thinking
>     >>> that the broker and topic configs were wrong.
>     >>>
>     >>> PublisingKafka uses PublishingContext.  That class has bean
> property
>     >>> called maxRequestSize, which defaults to 1048576.  As far as I can
> tell
>     >> the
>     >>> setMaxRequestSize() method is never called (except by some test
> code.)
>     >>> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize()
> and
>     >>> passes the result to the constructor for StreamDemarcator.   The
> publish
>     >>> method then calls the StreamDemarcator. getNextToken(), which in
> turns
>     >>> calls StreamDemarcator.fill() which compares the stream position
> against
>     >>> the maxRequestSize and throws the exception with this line.
>     >>>
>     >>> throw new IllegalStateException("Maximum allowed data size of " +
>     >>> this.maxDataSize + " exceeded.");
>     >>>
>     >>> Which matches what I see in the nifi-app.log file…
>     >>>
>     >>> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
>     >>> o.apache.nifi.processors.kafka.PutKafka
>     >>> java.lang.IllegalStateException: Maximum allowed data size of
> 1048576
>     >>> exceeded.
>     >>>        at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
>     >> StreamDemarcator.java:153)
>     >>> ~[nifi-utils-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.stream.io.util.StreamDemarcator.
>     >>> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
>     >> KafkaPublisher.java:129)
>     >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.PutKafka$1.process(
>     >> PutKafka.java:315)
>     >>> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.controller.repository.
>     >>> StandardProcessSession.read(StandardProcessSession.java:1851)
>     >>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.controller.repository.
>     >>> StandardProcessSession.read(StandardProcessSession.java:1822)
>     >>> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.PutKafka.
>     >>> doRendezvousWithKafka(PutKafka.java:311)
> ~[nifi-kafka-processors-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.PutKafka.
>     >>> rendezvousWithKafka(PutKafka.java:287)
> ~[nifi-kafka-processors-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
>     >>> onTrigger(AbstractKafkaProcessor.java:76)
> ~[nifi-kafka-processors-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.controller.StandardProcessorNode.
> onTrigger(
>     >>> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.
> jar:0.7.0]
>     >>>        at org.apache.nifi.controller.tasks.
> ContinuallyRunProcessorTask.
>     >>> call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.controller.tasks.
> ContinuallyRunProcessorTask.
>     >>> call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0.
>     >>> jar:0.7.0]
>     >>>        at org.apache.nifi.controller.scheduling.
>     >>> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.
> java:127)
>     >>> [nifi-framework-core-0.7.0.jar:0.7.0]
>     >>>        at java.util.concurrent.Executors$RunnableAdapter.
>     >> call(Executors.java:511)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.FutureTask.runAndReset(
>     >> FutureTask.java:308)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
>     >>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> java:180)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.ScheduledThreadPoolExecutor$
>     >>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.ThreadPoolExecutor.runWorker(
>     >> ThreadPoolExecutor.java:1142)
>     >>> [na:1.8.0_45]
>     >>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>     >> ThreadPoolExecutor.java:617)
>     >>> [na:1.8.0_45]
>     >>>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>     >>>
>     >>> This occurs using PublishKafka, and PutKafka.  Setting the Max
> Record
>     >> Size
>     >>> property in the PutKafka processor has no affect on this.  Note
> the stack
>     >>> trace above is from the PutKafka processor with Max Record Size
> set to
>     >> 10MB.
>     >>>
>     >>> I believe that this a regression from 0.6.0.
>     >>>
>     >>> Chris McDermott
>     >>>
>     >>> Remote Business Analytics
>     >>> STaTS/StoreFront Remote
>     >>> HPE Storage
>     >>> Hewlett Packard Enterprise
>     >>> Mobile: +1 978-697-5315
>     >>>
>     >>>
>     >>>
>     >>> On 8/20/16, 3:48 PM, "Andrew Psaltis" <[email protected]>
> wrote:
>     >>>
>     >>>    Hi Chris,
>     >>>    Regarding the PutKafka processor looking at this block[1] of the
>     >>> PutKafka
>     >>>    code, it has a default size of 1 MB, but it does not restrict
> the
>     >>> size. The
>     >>>    DATA_SIZE_VALIDATOR does a sanity check and also enforces that
>     >>>    the supported value entered is the correct format <value> [B|
>     >>> KB|MB|GB|TB].
>     >>>    Later on in the code at this block[2], the value is set on the
> Kafka
>     >>>    config, again this does not enforce a value maximum.
>     >>>
>     >>>    In regards to the PublishKafka processor I do not see where it
>     >> accepts
>     >>> a
>     >>>    size nor restrict the size at all.
>     >>>
>     >>>    Have you adjusted the 'message.max.bytes' config value for your
>     >>> broker(s)?
>     >>>    The default value for that is 1 MB [3] (The url references the
> 0.8
>     >>> Kafka,
>     >>>    however I believe this default has been stable since the early
> days
>     >> of
>     >>> the
>     >>>    project.)
>     >>>
>     >>>    If you really do want to send messages that are larger than 1
> MB in
>     >>> size, I
>     >>>    would highly recommending reading this post[4] from Gwen
> Shapira.  It
>     >>> does
>     >>>    a great job of outlining the things you need to take into
>     >>> consideration.
>     >>>    This will also point you to the relevant configs in Kafka that
> will
>     >>> need to
>     >>>    be adjusted if you decide to go this route.
>     >>>
>     >>>
>     >>>    Thanks,
>     >>>    Andrew
>     >>>
>     >>>    [1]
>     >>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>     >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>     >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
>     >>>    [2]
>     >>>    https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
>     >>> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
>     >>> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>     >>>    [3] https://kafka.apache.org/08/configuration.html
>     >>>    [4] http://ingest.tips/2015/01/21/
> handling-large-messages-kafka/
>     >>>
>     >>>    On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
>     >>>    STaTS/StorefrontRemote) <[email protected]> wrote:
>     >>>
>     >>>> Hi folks,
>     >>>>
>     >>>>
>     >>>>
>     >>>> From experimentation and looking at the code it seems that the max
>     >>> message
>     >>>> size that can be sent via the PublishKafka and PutKafka processors
>     >>> in 0.7.0
>     >>>> is 1MB.  Can someone please confirm my read on this?
>     >>>>
>     >>>>
>     >>>>
>     >>>> Thanks,
>     >>>>
>     >>>>
>     >>>>
>     >>>> Chris McDermott
>     >>>>
>     >>>>
>     >>>>
>     >>>> Remote Business Analytics
>     >>>>
>     >>>> STaTS/StoreFront Remote
>     >>>>
>     >>>> HPE Storage
>     >>>>
>     >>>> Hewlett Packard Enterprise
>     >>>>
>     >>>> Mobile: +1 978-697-5315
>     >>>>
>     >>>>
>     >>>>
>     >>>>
>     >>>
>     >>>
>     >>>    --
>     >>>    Thanks,
>     >>>    Andrew
>     >>>
>     >>>    Subscribe to my book: Streaming Data <
> http://manning.com/psaltis>
>     >>>    <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>     >>>    twiiter: @itmdata <http://twitter.com/intent/
>     >> user?screen_name=itmdata>
>     >>>
>     >>>
>     >>>
>     >>
>     >>
>     >> --
>     >> Thanks,
>     >> Andrew
>     >>
>     >> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>     >> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>     >> twiiter: @itmdata <http://twitter.com/intent/
> user?screen_name=itmdata>
>     >>
>     >
>     >
>     >
>     >
>
>
>
>
  • Max Kafka message s... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
    • Re: Max Kafka ... Andrew Psaltis
      • Re: Max Ka... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
        • Re: Ma... Andrew Psaltis
          • Re... Joe Witt
            • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
              • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)

Reply via email to