Hi,

First, here's a handy slide-deck on avoiding data loss in Kafka:
http://www.slideshare.net/gwenshap/kafka-reliability-when-it-absolutely-positively-has-to-be-there

Note configuration parameters like the number of retries.

Also, it looks like you are sending data to Kafka asynchronously, but you
don't have a callback - so if Send fails, you have no way of updating the
count, logging or anything else. I recommend taking a look at the callback
API for sending data and improving your test to at least record send
failures.

Gwen


On Thu, Nov 12, 2015 at 11:10 AM, Hawin Jiang <hawin.ji...@gmail.com> wrote:

> Hi  Pradeep
>
> Here is my configuration
>
> ############################# Producer Basics #############################
>
> # list of brokers used for bootstrapping knowledge about the rest of the
> cluster
> # format: host1:port1,host2:port2 ...
> metadata.broker.list=localhost:9092
>
> # name of the partitioner class for partitioning events; default partition
> spreads data randomly
> #partitioner.class=
>
> # specifies whether the messages are sent asynchronously (async) or
> synchronously (sync)
> producer.type=sync
>
> # specify the compression codec for all data generated: none, gzip, snappy,
> lz4.
> # the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy,
> lz4, respectively
> compression.codec=none
>
> # message encoder
> serializer.class=kafka.serializer.DefaultEncoder
>
> # allow topic level compression
> #compressed.topics=
>
> ############################# Async Producer #############################
> # maximum time, in milliseconds, for buffering data on the producer queue
> #queue.buffering.max.ms=
>
> # the maximum size of the blocking queue for buffering on the producer
> #queue.buffering.max.messages=
>
> # Timeout for event enqueue:
> # 0: events will be enqueued immediately or dropped if the queue is full
> # -ve: enqueue will block indefinitely if the queue is full
> # +ve: enqueue will block up to this many milliseconds if the queue is full
> #queue.enqueue.timeout.ms=
>
> # the number of messages batched at the producer
> #batch.num.messages=
>
>
>
> @Jinxing
>
> I have not found the kafka 0.8.3.0 on
> http://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10.
> The latest release is 0.8.2.2.
> if we flush producer all the times,  I think it will impact our
> performance.
> I just want to make sure how many messages in producer, then can be found
> it in consumer as well.
> We can not lost so many messages in our system.
>
>
>
> Best regards
> Hawin
>
>
> On Thu, Nov 12, 2015 at 7:02 AM, jinxing <jinxing6...@126.com> wrote:
>
> > i have 3 brokers;
> > the ack configuration is -1(all), meaning a message is sent successfully
> > only after getting every broker's  ack;
> > is this a bug?
> >
> >
> >
> >
> >
> >
> >
> > At 2015-11-12 21:08:49, "Pradeep Gollakota" <pradeep...@gmail.com>
> wrote:
> > >What is your producer configuration? Specifically, how many acks are you
> > >requesting from Kafka?
> > >
> > >On Thu, Nov 12, 2015 at 2:03 AM, jinxing <jinxing6...@126.com> wrote:
> > >
> > >> in kafka_0.8.3.0:
> > >> kafkaProducer = new KafkaProducer<>(properties, new
> > ByteArraySerializer(),
> > >> new ByteArraySerializer());
> > >> kafkaProducer.flush();
> > >> you can call the flush after sending every few messages;
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> At 2015-11-12 17:36:24, "Hawin Jiang" <hawin.ji...@gmail.com> wrote:
> > >> >Hi  Prabhjot
> > >> >
> > >> >The messages are "Thread1_kafka_1" and "Thread2_kafka_1". Something
> > like
> > >> >that.
> > >> >
> > >> >For GetOffsetShell report below:
> > >> >
> > >> >[kafka@dn-01 bin]$ ./kafka-run-class.sh kafka.tools.GetOffsetShell
> > >> >--broker-list dn-01:9092 --time -1 --topic kafka-test
> > >> >kafka-test:0:12529261
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >@Jinxing
> > >> >
> > >> >Can you share your flush example to me?  How to avoid lost my
> messages?
> > >> >Thanks.
> > >> >
> > >> >
> > >> >
> > >> >Best regards
> > >> >Hawin
> > >> >
> > >> >
> > >> >
> > >> >On Thu, Nov 12, 2015 at 1:00 AM, jinxing <jinxing6...@126.com>
> wrote:
> > >> >
> > >> >> there is a flush api of the producer, you can call this to prevent
> > >> >> messages lost;
> > >> >>
> > >> >>
> > >> >> maybe it can help;
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >> At 2015-11-12 16:43:54, "Hawin Jiang" <hawin.ji...@gmail.com>
> wrote:
> > >> >> >Hi  Jinxing
> > >> >> >
> > >> >> >I don't think we can resolve this issue by increasing producers.
> > if I
> > >> >> >increased more producers, it should lost more messages.
> > >> >> >
> > >> >> >I just test two producers.
> > >> >> >Thread Producer 1 has 83064 messages in producer side and 82273
> > >> messages
> > >> >> in
> > >> >> >consumer side
> > >> >> >Thread Producer 2 has 89844 messages in producer side and 88892
> > >> messages
> > >> >> in
> > >> >> >consumer side.
> > >> >> >
> > >> >> >Thanks.
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >Best regards
> > >> >> >Hawin
> > >> >> >
> > >> >> >
> > >> >> >On Thu, Nov 12, 2015 at 12:24 AM, jinxing <jinxing6...@126.com>
> > wrote:
> > >> >> >
> > >> >> >> maybe there some changes in 0.9.0.0;
> > >> >> >>
> > >> >> >>
> > >> >> >> but still you can try increase producer sending rate, and see if
> > >> there
> > >> >> are
> > >> >> >> message lost but no exception;
> > >> >> >>
> > >> >> >>
> > >> >> >> note that, to increase the producer sending rate, you must have
> > >> enough
> > >> >> >> producer 'power';
> > >> >> >>
> > >> >> >>
> > >> >> >> in my case,  I have 50 producer sending message at the same time
> > : )
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> At 2015-11-12 16:16:23, "Hawin Jiang" <hawin.ji...@gmail.com>
> > wrote:
> > >> >> >> >Hi  Jinxing
> > >> >> >> >
> > >> >> >> >I am using kafka_2.10-0.9.0.0-SNAPSHOT.  I have downloaded
> source
> > >> code
> > >> >> and
> > >> >> >> >installed it last week.
> > >> >> >> >
> > >> >> >> >I saw 97446 messages have been sent to kafka successfully.
> > >> >> >> >
> > >> >> >> >So far, I have not found any failed messages.
> > >> >> >> >
> > >> >> >> >
> > >> >> >> >
> > >> >> >> >Best regards
> > >> >> >> >Hawin
> > >> >> >> >
> > >> >> >> >On Thu, Nov 12, 2015 at 12:07 AM, jinxing <jinxing6...@126.com
> >
> > >> wrote:
> > >> >> >> >
> > >> >> >> >> Hi, what version are you using ?
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >> i am using 0.8.2.0, and I encountered this problem before;
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >> it seems that if the message rate of the producer side is to
> > high,
> > >> >> some
> > >> >> >> of
> > >> >> >> >> the messages will lost;
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >> also, i found that the callback method of the producer 'send'
> > API
> > >> is
> > >> >> not
> > >> >> >> >> reliable;
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >> only successful sent message will trigger the callback, but
> the
> > >> >> failed
> > >> >> >> >> ones don't;
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >> you saw this?
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >> At 2015-11-12 16:01:17, "Hawin Jiang" <hawin.ji...@gmail.com
> >
> > >> wrote:
> > >> >> >> >> >Hi  All
> > >> >> >> >> >
> > >> >> >> >> >I have sent messages to Kafka for one minute.  I found 97446
> > >> >> messages
> > >> >> >> in
> > >> >> >> >> >producer side and 96896 messages in consumer side for Case
> 1.
> > >> >> >> >> >I also tried case 2. I have faced the same issues.  The
> > number is
> > >> >> not
> > >> >> >> >> match
> > >> >> >> >> >between producer and consumer.
> > >> >> >> >> >Can someone take a look at this issue?
> > >> >> >> >> >Thanks.
> > >> >> >> >> >
> > >> >> >> >> >
> > >> >> >> >> >Case 1:
> > >> >> >> >> >
> > >> >> >> >> >long startTime = System.currentTimeMillis();
> > >> >> >> >> >long maxDurationInMilliseconds = 1 * 60 * 1000;
> > >> >> >> >> >int messageNo = 0;
> > >> >> >> >> >while (true) {
> > >> >> >> >> >if (System.currentTimeMillis() <= startTime
> > >> >> >> >> >+ maxDurationInMilliseconds) {
> > >> >> >> >> >messageNo++;
> > >> >> >> >> >String messageStr = "KAFKA_"+messageNo;
> > >> >> >> >> >System.out.println("Message: "+messageNo);
> > >> >> >> >> >producer.send(new KeyedMessage<Integer,
> > >> String>(topic,messageStr));
> > >> >> >> >> >} else {
> > >> >> >> >> >producer.close();
> > >> >> >> >> >System.out.println("Total kafka Message: "+messageNo);
> > >> >> >> >> >break;
> > >> >> >> >> >}
> > >> >> >> >> >}
> > >> >> >> >> >
> > >> >> >> >> >
> > >> >> >> >> >Case 2:
> > >> >> >> >> >
> > >> >> >> >> >for (int i=1;i<=12000;i++)
> > >> >> >> >> >String messageStr = "KAFKA_"+i;
> > >> >> >> >> >producer.send(new KeyedMessage<Integer,
> > >> String>(topic,messageStr));
> > >> >> >> >> >
> > >> >> >> >> >
> > >> >> >> >> >
> > >> >> >> >> >Best regards
> > >> >> >> >> >Hawin
> > >> >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
>

Reply via email to