Hi Ninad and Till,

Thank you for looking into the issue! This is actually a bug.

Till’s suggestion is correct:
The producer holds a `pendingRecords` value that is incremented on each 
invoke() and decremented on each callback, used to check if the producer needs 
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after 
flushing the `pendingRecords == 0` and `asyncException == null` (currently, 
we’re only checking `pendingRecords`).

A quick fix for this is to check and rethrow async exceptions in the 
`snapshotState` method both before and after flushing and `pendingRecords` 
becomes 0.
I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5701.

Cheers,
Gordon

On February 3, 2017 at 6:05:23 AM, Till Rohrmann (trohrm...@apache.org) wrote:

Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might go 
under certain circumstances unnoticed. So for example you have a write 
operation which fails this will set the asyncException field which is not 
checked before the next invoke call happens. If now a checkpoint operation 
happens, it will pass and mark all messages up to this point as being 
successfully processed. Only after the checkpoint, the producer will fail. And 
this constitutes a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar 
with the Kafka producer. Maybe they can answer your and my questions.

Cheers,
Till

On Thu, Feb 2, 2017 at 9:58 PM, ninad <nni...@gmail.com> wrote:
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
'retry' mechanism doesn't kick in until a message is added to it's internal
buffer.

If there's an exception before that, KafkaProducer will throw that
exception, and seems like Flink isn't handling that. In this case there will
be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " +
e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

Here are the scenario's we've identified that will cause data loss:

All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries
to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
configured timeout, it throws an exception.
-Memory records not writable (Existing bug in kafka 0.9.0.1 library)
https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore
the messages. the messages aren't even logged. The exception is, but not the
messages which failed.

Possible workarounds (Kafka settings):

A very high value for metadata timeout (metadata.fetch.timeout.ms)
A very high value for buffer expiry (request.timeout.ms)
We're still investigating the possible side effects of changing the above
kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data
loss by modifying some Flink settings?

Thanks.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to