Repository: flink
Updated Branches:
  refs/heads/master adbf846f2 -> c39ad31f3


[FLINK-3679] [kafka] Allow Kafka consumer to skip corrupted messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afb4c5e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afb4c5e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afb4c5e0

Branch: refs/heads/master
Commit: afb4c5e02c513a82d2ad7f7816065fdd93665e0e
Parents: adbf846
Author: Haohui Mai <whe...@apache.org>
Authored: Thu Mar 2 13:33:13 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Thu Mar 9 14:05:37 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  4 +
 .../connectors/kafka/Kafka09FetcherTest.java    | 84 ++++++++++++++++++++
 .../kafka/internals/AbstractFetcher.java        |  4 +
 3 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 0f700ab..331c9c7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the 
binary data in Kafka into
 `DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
+There are two possible design choices when the `DeserializationSchema` 
encounters a corrupted message. It can
+either throw an `IOException` which causes the pipeline to be restarted, or it 
can return `null` where the Flink
+Kafka consumer will silently skip the corrupted message.
+
 It is usually helpful to start from the `AbstractDeserializationSchema`, which 
takes care of describing the
 produced Java/Scala type to Flink's type system. Users that implement a 
vanilla `DeserializationSchema` need
 to implement the `getProducedType(...)` method themselves.

http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 49144e6..61a8855 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -29,6 +31,7 @@ import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectingSourceContext;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -49,6 +52,8 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -419,6 +424,85 @@ public class Kafka09FetcherTest {
                assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
        }
 
+       @Test
+       public void testSkipCorruptedMessage() throws Exception {
+
+               // ----- some test data -----
+
+               final String topic = "test-topic";
+               final int partition = 3;
+               final byte[] payload = new byte[] {1, 2, 3, 4};
+
+               final List<ConsumerRecord<byte[], byte[]>> records = 
Arrays.asList(
+                       new ConsumerRecord<>(topic, partition, 15, payload, 
payload),
+                       new ConsumerRecord<>(topic, partition, 16, payload, 
payload),
+                       new ConsumerRecord<>(topic, partition, 17, payload, 
"end".getBytes()));
+
+               final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
data = new HashMap<>();
+               data.put(new TopicPartition(topic, partition), records);
+
+               final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
+
+               // ----- the test consumer -----
+
+               final KafkaConsumer<?, ?> mockConsumer = 
mock(KafkaConsumer.class);
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) {
+                               return consumerRecords;
+                       }
+               });
+
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- build a fetcher -----
+
+               ArrayList<String> results = new ArrayList<>();
+               SourceContext<String> sourceContext = new 
CollectingSourceContext<>(results, results);
+               Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+                       Collections.singletonMap(new KafkaTopicPartition(topic, 
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchema<String>() {
+
+                       @Override
+                       public String deserialize(byte[] messageKey, byte[] 
message,
+                                                                         
String topic, int partition, long offset) throws IOException {
+                               return offset == 15 ? null : new 
String(message);
+                       }
+
+                       @Override
+                       public boolean isEndOfStream(String nextElement) {
+                               return "end".equals(nextElement);
+                       }
+
+                       @Override
+                       public TypeInformation<String> getProducedType() {
+                               return BasicTypeInfo.STRING_TYPE_INFO;
+                       }
+               };
+
+               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+                       sourceContext,
+                       partitionsWithInitialOffsets,
+                       null, /* periodic watermark extractor */
+                       null, /* punctuated watermark extractor */
+                       new TestProcessingTimeService(),
+                       10, /* watermark interval */
+                       this.getClass().getClassLoader(),
+                       true, /* checkpointing */
+                       "task_name",
+                       new UnregisteredMetricsGroup(),
+                       schema,
+                       new Properties(),
+                       0L,
+                       false);
+
+
+               // ----- run the fetcher -----
+
+               fetcher.runFetchLoop();
+               assertEquals(1, results.size());
+       }
+
        // 
------------------------------------------------------------------------
        //  test utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index e021881..76ce1a0 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -213,6 +213,10 @@ public abstract class AbstractFetcher<T, KPH> {
         * @param offset The offset of the record
         */
        protected void emitRecord(T record, KafkaTopicPartitionState<KPH> 
partitionState, long offset) throws Exception {
+               if (record == null) {
+                       return;
+               }
+
                if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
                        // fast path logic, in case there are no watermarks
 

Reply via email to