Thanks Koji for replying. After your email I looked into Kafka logs again
and found out that message.max.bytes was setup to accept only 1 MB
messages. I increased this to 10 MB and after that messages are now being
pushed to Kafka.

Thanks for helping !!

On Sun, Nov 19, 2017 at 7:01 PM, Koji Kawamura <ijokaruma...@gmail.com>
wrote:

> Hi Mayank,
>
> I've tried to reproduce the issue, but to no avail so far.
> PublishKafka_0_10 uses the specified Max Request Size as expected and
> I got the exception if incoming message size exceeds the configured
> size.
> And I was able to publish messages whose size is 2.08MB with 10MB Max
> Request Size.
>
> The stacktrace you reported is created within NiFi AbstractDemarcator
> (StreamDemarcator), when it tried to read bytes from incoming FlowFile
> content and read size exceeds maxDataSize.
> StreamDemarcator.maxDataSize is set to the specified PublishKafka_0_10
> 'Max Request Size'.
>
> Does this issue still happen? If so, do you mind sharing your
> processor configuration by exporting as a template?
>
> Thanks,
> Koji
>
>
> On Sat, Nov 18, 2017 at 1:54 AM, mayank rathi <mayank.ra...@gmail.com>
> wrote:
> > Hello All,
> >
> > I am getting this error in PublishKafka_0_10 processor for a message of
> > size 2.08 MB. I have updated Max Request Size to 10 MB in processor
> > properties and max.request.size to 10 MB in Kafka's server.properties.
> > After reboot Kafka Broker I can see that max.request.size = 10 MB in
> Kafka
> > logs but I am still getting below error.
> >
> > What am I missing here?
> >
> > 2017-11-17 11:07:47,966 ERROR [Timer-Driven Process Thread-4]
> > o.a.n.p.kafka.pubsub.PublishKafka_0_10
> > PublishKafka_0_10[id=e6d932d9-97ae-1647-aa8f-86d07791ce25]
> > Failed to send all message for StandardFlowFileRecord[uuid=
> > fa2399e5-bea5-4113-b58b-6cdef228733c,claim=StandardContentClaim
> > [resourceClaim=StandardResourceClaim[id=1510934860019-132,
> > container=default, section=132], offset=0, length=2160613],offset=0,name=
> > 12337127439954063,size=2160613] to Kafka; routing to failure due to
> > org.apache.nifi.stream.io.exception.TokenTooLargeException: A message in
> > the stream exceeds the maximum allowed message size of 1048576 bytes.: {}
> > org.apache.nifi.stream.io.exception.TokenTooLargeException: A message in
> > the stream exceeds the maximum allowed message size of 1048576 bytes.
> >     at org.apache.nifi.stream.io.util.AbstractDemarcator.
> extractDataToken(
> > AbstractDemarcator.java:157)
> >     at org.apache.nifi.stream.io.util.StreamDemarcator.
> > nextToken(StreamDemarcator.java:129)
> >     at org.apache.nifi.processors.kafka.pubsub.PublisherLease.
> > publish(PublisherLease.java:78)
> >     at org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_
> > 10$1.process(PublishKafka_0_10.java:334)
> >     at org.apache.nifi.controller.repository.
> StandardProcessSession.read(
> > StandardProcessSession.java:2136)
> >     at org.apache.nifi.controller.repository.
> StandardProcessSession.read(
> > StandardProcessSession.java:2106)
> >     at org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_
> > 10.onTrigger(PublishKafka_0_10.java:330)
> >     at org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > AbstractProcessor.java:27)
> >     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > StandardProcessorNode.java:1120)
> >     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> > ContinuallyRunProcessorTask.java:147)
> >     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> > ContinuallyRunProcessorTask.java:47)
> >     at org.apache.nifi.controller.scheduling.
> TimerDrivenSchedulingAgent$1.
> > run(TimerDrivenSchedulingAgent.java:132)
> >     at java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> >     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >     at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >     at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >     at java.lang.Thread.run(Thread.java:748)
> >
> > Thanks and Regards
> > Mayank
> >
> > --
> > NOTICE: This email message is for the sole use of the intended
> recipient(s)
> > and may contain confidential and privileged information. Any unauthorized
> > review, use, disclosure or distribution is prohibited. If you are not the
> > intended recipient, please contact the sender by reply email and destroy
> > all copies of the original message.
>



-- 
NOTICE: This email message is for the sole use of the intended recipient(s)
and may contain confidential and privileged information. Any unauthorized
review, use, disclosure or distribution is prohibited. If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.

Reply via email to