Abdul created FLINK-33001: ----------------------------- Summary: KafkaSource in batch mode failing with exception if topic partition is empty Key: FLINK-33001 URL: https://issues.apache.org/jira/browse/FLINK-33001 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1, 1.14.6, 1.12.7 Environment: The only workaround that works fine right now is to change the DEBUG level to INFO for logging.
{code:java} logger.KafkaPartitionSplitReader.name = org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader logger.KafkaPartitionSplitReader.level = INFO{code} It is strange that changing this doesn't cause the above exception. Reporter: Abdul If the Kafka topic is empty in Batch mode, there is an exception while processing it. This bug was supposedly fixed but unfortunately, the exception still occurs. The original bug was reported as this https://issues.apache.org/jira/browse/FLINK-27041 We tried to backport it but it still doesn't work. * The problem will occur in case of DEBUG level of logger for class org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader * The same problems will occur in other versions of Flink, at least in the 1.15 release branch and tag release-1.15.4 * Same problem also occur in Flink 1.7.1 and 1.14 The minimal code to produce this is final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); KafkaSource<String> kafkaSource = KafkaSource .<String>builder() .setBootstrapServers("localhost:9092") .setTopics("test_topic") .setValueOnlyDeserializer(new SimpleStringSchema()) .setBounded(OffsetsInitializer.latest()) .build(); DataStream<String> stream = env.fromSource( kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source" ); stream.print(); env.execute("Flink KafkaSource test job"); This produces exception: {code:java} Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 moreCaused by: java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer. at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1737) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1704) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.maybeLogSplitChangesHandlingResult(KafkaPartitionSplitReader.java:375) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:232) at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:49) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) ... 6 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)