in kafka_0.8.3.0:
kafkaProducer = new KafkaProducer<>(properties, new ByteArraySerializer(),
new ByteArraySerializer());
kafkaProducer.flush();
you can call the flush after sending every few messages;







At 2015-11-12 17:36:24, "Hawin Jiang" <hawin.ji...@gmail.com> wrote:
>Hi  Prabhjot
>
>The messages are "Thread1_kafka_1" and "Thread2_kafka_1". Something like
>that.
>
>For GetOffsetShell report below:
>
>[kafka@dn-01 bin]$ ./kafka-run-class.sh kafka.tools.GetOffsetShell
>--broker-list dn-01:9092 --time -1 --topic kafka-test
>kafka-test:0:12529261
>
>
>
>
>@Jinxing
>
>Can you share your flush example to me?  How to avoid lost my messages?
>Thanks.
>
>
>
>Best regards
>Hawin
>
>
>
>On Thu, Nov 12, 2015 at 1:00 AM, jinxing <jinxing6...@126.com> wrote:
>
>> there is a flush api of the producer, you can call this to prevent
>> messages lost;
>>
>>
>> maybe it can help;
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2015-11-12 16:43:54, "Hawin Jiang" <hawin.ji...@gmail.com> wrote:
>> >Hi  Jinxing
>> >
>> >I don't think we can resolve this issue by increasing producers.  if I
>> >increased more producers, it should lost more messages.
>> >
>> >I just test two producers.
>> >Thread Producer 1 has 83064 messages in producer side and 82273 messages
>> in
>> >consumer side
>> >Thread Producer 2 has 89844 messages in producer side and 88892 messages
>> in
>> >consumer side.
>> >
>> >Thanks.
>> >
>> >
>> >
>> >Best regards
>> >Hawin
>> >
>> >
>> >On Thu, Nov 12, 2015 at 12:24 AM, jinxing <jinxing6...@126.com> wrote:
>> >
>> >> maybe there some changes in 0.9.0.0;
>> >>
>> >>
>> >> but still you can try increase producer sending rate, and see if there
>> are
>> >> message lost but no exception;
>> >>
>> >>
>> >> note that, to increase the producer sending rate, you must have enough
>> >> producer 'power';
>> >>
>> >>
>> >> in my case,  I have 50 producer sending message at the same time : )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> At 2015-11-12 16:16:23, "Hawin Jiang" <hawin.ji...@gmail.com> wrote:
>> >> >Hi  Jinxing
>> >> >
>> >> >I am using kafka_2.10-0.9.0.0-SNAPSHOT.  I have downloaded source code
>> and
>> >> >installed it last week.
>> >> >
>> >> >I saw 97446 messages have been sent to kafka successfully.
>> >> >
>> >> >So far, I have not found any failed messages.
>> >> >
>> >> >
>> >> >
>> >> >Best regards
>> >> >Hawin
>> >> >
>> >> >On Thu, Nov 12, 2015 at 12:07 AM, jinxing <jinxing6...@126.com> wrote:
>> >> >
>> >> >> Hi, what version are you using ?
>> >> >>
>> >> >>
>> >> >> i am using 0.8.2.0, and I encountered this problem before;
>> >> >>
>> >> >>
>> >> >> it seems that if the message rate of the producer side is to high,
>> some
>> >> of
>> >> >> the messages will lost;
>> >> >>
>> >> >>
>> >> >> also, i found that the callback method of the producer 'send' API is
>> not
>> >> >> reliable;
>> >> >>
>> >> >>
>> >> >> only successful sent message will trigger the callback, but the
>> failed
>> >> >> ones don't;
>> >> >>
>> >> >>
>> >> >> you saw this?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> At 2015-11-12 16:01:17, "Hawin Jiang" <hawin.ji...@gmail.com> wrote:
>> >> >> >Hi  All
>> >> >> >
>> >> >> >I have sent messages to Kafka for one minute.  I found 97446
>> messages
>> >> in
>> >> >> >producer side and 96896 messages in consumer side for Case 1.
>> >> >> >I also tried case 2. I have faced the same issues.  The number is
>> not
>> >> >> match
>> >> >> >between producer and consumer.
>> >> >> >Can someone take a look at this issue?
>> >> >> >Thanks.
>> >> >> >
>> >> >> >
>> >> >> >Case 1:
>> >> >> >
>> >> >> >long startTime = System.currentTimeMillis();
>> >> >> >long maxDurationInMilliseconds = 1 * 60 * 1000;
>> >> >> >int messageNo = 0;
>> >> >> >while (true) {
>> >> >> >if (System.currentTimeMillis() <= startTime
>> >> >> >+ maxDurationInMilliseconds) {
>> >> >> >messageNo++;
>> >> >> >String messageStr = "KAFKA_"+messageNo;
>> >> >> >System.out.println("Message: "+messageNo);
>> >> >> >producer.send(new KeyedMessage<Integer, String>(topic,messageStr));
>> >> >> >} else {
>> >> >> >producer.close();
>> >> >> >System.out.println("Total kafka Message: "+messageNo);
>> >> >> >break;
>> >> >> >}
>> >> >> >}
>> >> >> >
>> >> >> >
>> >> >> >Case 2:
>> >> >> >
>> >> >> >for (int i=1;i<=12000;i++)
>> >> >> >String messageStr = "KAFKA_"+i;
>> >> >> >producer.send(new KeyedMessage<Integer, String>(topic,messageStr));
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >Best regards
>> >> >> >Hawin
>> >> >>
>> >>
>>

Reply via email to