[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
[ https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-28185: --- Fix Version/s: 1.17.0 (was: 1.18.0) > "Invalid negative offset" when using OffsetsInitializer.timestamp(.) > > > Key: FLINK-28185 > URL: https://issues.apache.org/jira/browse/FLINK-28185 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0 > Environment: Flink 1.15.0 > Kafka 2.8.1 >Reporter: Peter Schrott >Assignee: Mason Chen >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0, 1.16.3 > > Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, > NegativeOffsetSpec.scala > > > When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty > partitions – little traffice + low retention – an {{IllegalArgumentException: > Invalid negative offset}} occures. See stracktrace below. > The problem here is, that the admin client returns -1 as timestamps and > offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] > Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} > object from the admin client response the exception is thrown. > {code:java} > org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition > splits due to > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615) > at > org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) > ... 8 common frames omitted > 15:25:58.025 INFO [flink-akka.actor.default-dispatcher-11] > o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator > 351e440289835f2ff3e6fee31bf6e13c). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) > at >
[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
[ https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-28185: --- Fix Version/s: 1.16.3 > "Invalid negative offset" when using OffsetsInitializer.timestamp(.) > > > Key: FLINK-28185 > URL: https://issues.apache.org/jira/browse/FLINK-28185 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0 > Environment: Flink 1.15.0 > Kafka 2.8.1 >Reporter: Peter Schrott >Assignee: Mason Chen >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3 > > Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, > NegativeOffsetSpec.scala > > > When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty > partitions – little traffice + low retention – an {{IllegalArgumentException: > Invalid negative offset}} occures. See stracktrace below. > The problem here is, that the admin client returns -1 as timestamps and > offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] > Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} > object from the admin client response the exception is thrown. > {code:java} > org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition > splits due to > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615) > at > org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) > ... 8 common frames omitted > 15:25:58.025 INFO [flink-akka.actor.default-dispatcher-11] > o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator > 351e440289835f2ff3e6fee31bf6e13c). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) > at >
[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
[ https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-28185: - Fix Version/s: 1.18.0 (was: 1.17.0) > "Invalid negative offset" when using OffsetsInitializer.timestamp(.) > > > Key: FLINK-28185 > URL: https://issues.apache.org/jira/browse/FLINK-28185 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0 > Environment: Flink 1.15.0 > Kafka 2.8.1 >Reporter: Peter Schrott >Assignee: Mason Chen >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0 > > Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, > NegativeOffsetSpec.scala > > > When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty > partitions – little traffice + low retention – an {{IllegalArgumentException: > Invalid negative offset}} occures. See stracktrace below. > The problem here is, that the admin client returns -1 as timestamps and > offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] > Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} > object from the admin client response the exception is thrown. > {code:java} > org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition > splits due to > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615) > at > org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) > ... 8 common frames omitted > 15:25:58.025 INFO [flink-akka.actor.default-dispatcher-11] > o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator > 351e440289835f2ff3e6fee31bf6e13c). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) > at >
[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
[ https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28185: --- Labels: pull-request-available (was: ) > "Invalid negative offset" when using OffsetsInitializer.timestamp(.) > > > Key: FLINK-28185 > URL: https://issues.apache.org/jira/browse/FLINK-28185 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0 > Environment: Flink 1.15.0 > Kafka 2.8.1 >Reporter: Peter Schrott >Assignee: Mason Chen >Priority: Minor > Labels: pull-request-available > Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, > NegativeOffsetSpec.scala > > > When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty > partitions – little traffice + low retention – an {{IllegalArgumentException: > Invalid negative offset}} occures. See stracktrace below. > The problem here is, that the admin client returns -1 as timestamps and > offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] > Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} > object from the admin client response the exception is thrown. > {code:java} > org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition > splits due to > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615) > at > org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) > ... 8 common frames omitted > 15:25:58.025 INFO [flink-akka.actor.default-dispatcher-11] > o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator > 351e440289835f2ff3e6fee31bf6e13c). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) > at >
[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
Title: Message Title Arseniy Tashoyan updated an issue Flink / FLINK-28185 "Invalid negative offset" when using OffsetsInitializer.timestamp(.) Change By: Arseniy Tashoyan Attachment: NegativeOffsetSpec.scala Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
[ https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Schrott updated FLINK-28185: -- Description: When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty partitions – little traffice + low retention – an {{IllegalArgumentException: Invalid negative offset}} occures. See stracktrace below. The problem here is, that the admin client returns -1 as timestamps and offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} object from the admin client response the exception is thrown. {code:java} org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition splits due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalArgumentException: Invalid negative offset at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615) at org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ... 8 common frames omitted 15:25:58.025 INFO [flink-akka.actor.default-dispatcher-11] o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure. org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 351e440289835f2ff3e6fee31bf6e13c). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:329) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at