Repository: flink Updated Branches: refs/heads/master adbf846f2 -> c39ad31f3
[FLINK-3679] [kafka] Allow Kafka consumer to skip corrupted messages Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afb4c5e0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afb4c5e0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afb4c5e0 Branch: refs/heads/master Commit: afb4c5e02c513a82d2ad7f7816065fdd93665e0e Parents: adbf846 Author: Haohui Mai <whe...@apache.org> Authored: Thu Mar 2 13:33:13 2017 -0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Thu Mar 9 14:05:37 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/kafka.md | 4 + .../connectors/kafka/Kafka09FetcherTest.java | 84 ++++++++++++++++++++ .../kafka/internals/AbstractFetcher.java | 4 + 3 files changed, 92 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/docs/dev/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 0f700ab..331c9c7 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)` method gets called for each Kafka message, passing the value from Kafka. +There are two possible design choices when the `DeserializationSchema` encounters a corrupted message. It can +either throw an `IOException` which causes the pipeline to be restarted, or it can return `null` where the Flink +Kafka consumer will silently skip the corrupted message. + It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care of describing the produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema` need to implement the `getProducedType(...)` method themselves. http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java index 49144e6..61a8855 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -29,6 +31,7 @@ import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.CollectingSourceContext; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -49,6 +52,8 @@ import org.mockito.stubbing.Answer; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -419,6 +424,85 @@ public class Kafka09FetcherTest { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); } + @Test + public void testSkipCorruptedMessage() throws Exception { + + // ----- some test data ----- + + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4}; + + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<>(topic, partition, 15, payload, payload), + new ConsumerRecord<>(topic, partition, 16, payload, payload), + new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes())); + + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); + + // ----- the test consumer ----- + + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- build a fetcher ----- + + ArrayList<String> results = new ArrayList<>(); + SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>() { + + @Override + public String deserialize(byte[] messageKey, byte[] message, + String topic, int partition, long offset) throws IOException { + return offset == 15 ? null : new String(message); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return "end".equals(nextElement); + } + + @Override + public TypeInformation<String> getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + }; + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // ----- run the fetcher ----- + + fetcher.runFetchLoop(); + assertEquals(1, results.size()); + } + // ------------------------------------------------------------------------ // test utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index e021881..76ce1a0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -213,6 +213,10 @@ public abstract class AbstractFetcher<T, KPH> { * @param offset The offset of the record */ protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception { + if (record == null) { + return; + } + if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { // fast path logic, in case there are no watermarks