Hi, Selina, I assume that you were referring to "max.message.bytes" in Kafka producer config? There is no "max.message.size" config. If you were referring to the "max.message.bytes", it has nothing to do w/ the number of messages in a topic, it is the limit on a single message size in bytes in Kafka.
And have you attached VisualVM or JConsole to the Samza container to see whether you can find the metrics reporting "send_calls" etc.? That should tell you whether your StreamTask class actually executed the collector.send(). There are also many other metrics, such as "process-calls", "process-envelopes" etc. You can also turn on the debug log in your container by setting the log4j.xml log levels. On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <swucaree...@gmail.com> wrote: > Hi, Yi: > > I am wondering if the problem can be fixed by the parameter " > max.message.size" at kafka.producer.ProducerConfig for the topic size? > > My Http Server send message to Kafka. The last message shown on > console is > "message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari > postalCode=95066 url=http://sample2.com language=ENG mobileBrand=Apple > count=4269" > > However the Kafka got Exception from message 4244th > The error is below and Kafka do not accept any new message after this. > > "[2015-07-24 12:46:11,078] WARN > [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread], > Failed to find leader for Set([http-demo,0]) > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(http-demo)] from broker [ArrayBuffer(id:0,host:10.1.10.173,port:9092)] > failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > ... 3 more > [2015-07-24 12:46:11,287] WARN Fetching topic metadata with correlation id > 21 for topics [Set(http-demo)] from broker > [id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)" > > > After the Error: > I show the topic, it is right, but can not show the content by command line > > Selinas-MacBook-Pro:samza-Demo selina$ deploy/kafka/bin/kafka-topics.sh > --list --zookeeper localhost:2181 > http-demo > Selinas-MacBook-Pro:samza-Demo selina$ > deploy/kafka/bin/kafka-console-consumer.sh > --zookeeper localhost:2181 --from-beginning --topic http-demo > [2015-07-24 12:47:38,730] WARN > [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87], > no brokers found when trying to rebalance. > (kafka.consumer.ZookeeperConsumerConnector) > > Attached is my Kafka properties for server and producer. > > Your help is highly appreciated > > > Sincerely, > Selina > > > > On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpa...@gmail.com> wrote: > >> Hi, Selina, >> >> Your question is not clear. >> {quote} >> When the messages was send to Kafka by KafkaProducer, It always failed >> when the message more than 3000 - 4000 messages. >> {quote} >> >> What's failing? The error stack shows errors on the consumer side and you >> were referring to failures to produce to Kafka. Could you be more specific >> regarding to what's your failure scenario? >> >> -Yi >> >> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu <swucaree...@gmail.com> >> wrote: >> >> > Hi, >> > >> > When the messages was send to Kafka by KafkaProducer, It always >> failed >> > when the message more than 3000 - 4000 messages. The error is shown >> below. >> > I am wondering if any topic size I need to set at Samza configuration? >> > >> > >> > [2015-07-23 17:30:03,792] WARN >> > >> > >> [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread], >> > Failed to find leader for Set([http-demo,0]) >> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) >> > kafka.common.KafkaException: fetching topic metadata for topics >> > [Set(http-demo)] from broker [ArrayBuffer()] failed >> > at >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >> > at >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) >> > at >> > >> > >> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) >> > at >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >> > ^CConsumed 4327 messages >> > >> > Your reply and comment will be highly appreciated. >> > >> > >> > Sincerely, >> > Selina >> > >> > >