Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3314#discussion_r104312079
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
---
@@ -419,6 +424,164 @@ public void run() {
assertFalse("fetcher threads did not properly finish",
sourceContext.isStillBlocking());
}
+ @Test
+ public void testSkipCorruptedMessage() throws Exception {
+
+ // ----- some test data -----
+
+ final String topic = "test-topic";
+ final int partition = 3;
+ final byte[] payload = new byte[] {1, 2, 3, 4};
+
+ final List<ConsumerRecord<byte[], byte[]>> records =
Arrays.asList(
+ new ConsumerRecord<>(topic, partition, 15, payload,
payload),
+ new ConsumerRecord<>(topic, partition, 16, payload,
payload),
+ new ConsumerRecord<>(topic, partition, 17, payload,
"end".getBytes()));
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
data = new HashMap<>();
+ data.put(new TopicPartition(topic, partition), records);
+
+ final ConsumerRecords<byte[], byte[]> consumerRecords = new
ConsumerRecords<>(data);
+
+ // ----- the test consumer -----
+
+ final KafkaConsumer<?, ?> mockConsumer =
mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new
Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock
invocation) {
+ return consumerRecords;
+ }
+ });
+
+
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- build a fetcher -----
+
+ ArrayList<String> results = new ArrayList<>();
+ SourceContext<String> sourceContext = new
CollectingSourceContext<>(results, results);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition(topic,
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ KeyedDeserializationSchema<String> schema = new
KeyedDeserializationSchema<String>() {
+
+ @Override
+ public String deserialize(byte[] messageKey, byte[]
message,
+
String topic, int partition, long offset) throws IOException {
+ return offset == 15 ? null : new
String(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return "end".equals(nextElement);
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ };
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ true, /* checkpointing */
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+
+ // ----- run the fetcher -----
+
+ fetcher.runFetchLoop();
+ assertEquals(1, results.size());
+ }
+
+ @Test
+ public void testNullAsEOF() throws Exception {
--- End diff --
I'm not sure if this test is necessary. It's essentially just testing that
`isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the
condition is `element == null` seems irrelevant to what's been tested.
We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---