in kafka_0.8.3.0: kafkaProducer = new KafkaProducer<>(properties, new ByteArraySerializer(), new ByteArraySerializer()); kafkaProducer.flush(); you can call the flush after sending every few messages;
At 2015-11-12 17:36:24, "Hawin Jiang" <hawin.ji...@gmail.com> wrote: >Hi Prabhjot > >The messages are "Thread1_kafka_1" and "Thread2_kafka_1". Something like >that. > >For GetOffsetShell report below: > >[kafka@dn-01 bin]$ ./kafka-run-class.sh kafka.tools.GetOffsetShell >--broker-list dn-01:9092 --time -1 --topic kafka-test >kafka-test:0:12529261 > > > > >@Jinxing > >Can you share your flush example to me? How to avoid lost my messages? >Thanks. > > > >Best regards >Hawin > > > >On Thu, Nov 12, 2015 at 1:00 AM, jinxing <jinxing6...@126.com> wrote: > >> there is a flush api of the producer, you can call this to prevent >> messages lost; >> >> >> maybe it can help; >> >> >> >> >> >> >> >> >> At 2015-11-12 16:43:54, "Hawin Jiang" <hawin.ji...@gmail.com> wrote: >> >Hi Jinxing >> > >> >I don't think we can resolve this issue by increasing producers. if I >> >increased more producers, it should lost more messages. >> > >> >I just test two producers. >> >Thread Producer 1 has 83064 messages in producer side and 82273 messages >> in >> >consumer side >> >Thread Producer 2 has 89844 messages in producer side and 88892 messages >> in >> >consumer side. >> > >> >Thanks. >> > >> > >> > >> >Best regards >> >Hawin >> > >> > >> >On Thu, Nov 12, 2015 at 12:24 AM, jinxing <jinxing6...@126.com> wrote: >> > >> >> maybe there some changes in 0.9.0.0; >> >> >> >> >> >> but still you can try increase producer sending rate, and see if there >> are >> >> message lost but no exception; >> >> >> >> >> >> note that, to increase the producer sending rate, you must have enough >> >> producer 'power'; >> >> >> >> >> >> in my case, I have 50 producer sending message at the same time : ) >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2015-11-12 16:16:23, "Hawin Jiang" <hawin.ji...@gmail.com> wrote: >> >> >Hi Jinxing >> >> > >> >> >I am using kafka_2.10-0.9.0.0-SNAPSHOT. I have downloaded source code >> and >> >> >installed it last week. >> >> > >> >> >I saw 97446 messages have been sent to kafka successfully. >> >> > >> >> >So far, I have not found any failed messages. >> >> > >> >> > >> >> > >> >> >Best regards >> >> >Hawin >> >> > >> >> >On Thu, Nov 12, 2015 at 12:07 AM, jinxing <jinxing6...@126.com> wrote: >> >> > >> >> >> Hi, what version are you using ? >> >> >> >> >> >> >> >> >> i am using 0.8.2.0, and I encountered this problem before; >> >> >> >> >> >> >> >> >> it seems that if the message rate of the producer side is to high, >> some >> >> of >> >> >> the messages will lost; >> >> >> >> >> >> >> >> >> also, i found that the callback method of the producer 'send' API is >> not >> >> >> reliable; >> >> >> >> >> >> >> >> >> only successful sent message will trigger the callback, but the >> failed >> >> >> ones don't; >> >> >> >> >> >> >> >> >> you saw this? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2015-11-12 16:01:17, "Hawin Jiang" <hawin.ji...@gmail.com> wrote: >> >> >> >Hi All >> >> >> > >> >> >> >I have sent messages to Kafka for one minute. I found 97446 >> messages >> >> in >> >> >> >producer side and 96896 messages in consumer side for Case 1. >> >> >> >I also tried case 2. I have faced the same issues. The number is >> not >> >> >> match >> >> >> >between producer and consumer. >> >> >> >Can someone take a look at this issue? >> >> >> >Thanks. >> >> >> > >> >> >> > >> >> >> >Case 1: >> >> >> > >> >> >> >long startTime = System.currentTimeMillis(); >> >> >> >long maxDurationInMilliseconds = 1 * 60 * 1000; >> >> >> >int messageNo = 0; >> >> >> >while (true) { >> >> >> >if (System.currentTimeMillis() <= startTime >> >> >> >+ maxDurationInMilliseconds) { >> >> >> >messageNo++; >> >> >> >String messageStr = "KAFKA_"+messageNo; >> >> >> >System.out.println("Message: "+messageNo); >> >> >> >producer.send(new KeyedMessage<Integer, String>(topic,messageStr)); >> >> >> >} else { >> >> >> >producer.close(); >> >> >> >System.out.println("Total kafka Message: "+messageNo); >> >> >> >break; >> >> >> >} >> >> >> >} >> >> >> > >> >> >> > >> >> >> >Case 2: >> >> >> > >> >> >> >for (int i=1;i<=12000;i++) >> >> >> >String messageStr = "KAFKA_"+i; >> >> >> >producer.send(new KeyedMessage<Integer, String>(topic,messageStr)); >> >> >> > >> >> >> > >> >> >> > >> >> >> >Best regards >> >> >> >Hawin >> >> >> >> >> >>