[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)

2023-06-05 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-28185:
---
Fix Version/s: 1.17.0
   (was: 1.18.0)

> "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
> 
>
> Key: FLINK-28185
> URL: https://issues.apache.org/jira/browse/FLINK-28185
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Flink 1.15.0
> Kafka 2.8.1
>Reporter: Peter Schrott
>Assignee: Mason Chen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.3
>
> Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, 
> NegativeOffsetSpec.scala
>
>
> When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty 
> partitions – little traffice + low retention – an {{IllegalArgumentException: 
> Invalid negative offset}} occures. See stracktrace below.
> The problem here is, that the admin client returns -1 as timestamps and 
> offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] 
> Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} 
> object from the admin client response the exception is thrown.
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition 
> splits due to 
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>     at 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
>     at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>     at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>     at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
>     ... 8 common frames omitted
> 15:25:58.025 INFO  [flink-akka.actor.default-dispatcher-11] 
> o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 
> 351e440289835f2ff3e6fee31bf6e13c).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
>     at 
> 

[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)

2023-06-05 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-28185:
---
Fix Version/s: 1.16.3

> "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
> 
>
> Key: FLINK-28185
> URL: https://issues.apache.org/jira/browse/FLINK-28185
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Flink 1.15.0
> Kafka 2.8.1
>Reporter: Peter Schrott
>Assignee: Mason Chen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3
>
> Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, 
> NegativeOffsetSpec.scala
>
>
> When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty 
> partitions – little traffice + low retention – an {{IllegalArgumentException: 
> Invalid negative offset}} occures. See stracktrace below.
> The problem here is, that the admin client returns -1 as timestamps and 
> offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] 
> Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} 
> object from the admin client response the exception is thrown.
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition 
> splits due to 
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>     at 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
>     at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>     at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>     at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
>     ... 8 common frames omitted
> 15:25:58.025 INFO  [flink-akka.actor.default-dispatcher-11] 
> o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 
> 351e440289835f2ff3e6fee31bf6e13c).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
>     at 
> 

[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)

2023-03-23 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-28185:
-
Fix Version/s: 1.18.0
   (was: 1.17.0)

> "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
> 
>
> Key: FLINK-28185
> URL: https://issues.apache.org/jira/browse/FLINK-28185
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Flink 1.15.0
> Kafka 2.8.1
>Reporter: Peter Schrott
>Assignee: Mason Chen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, 
> NegativeOffsetSpec.scala
>
>
> When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty 
> partitions – little traffice + low retention – an {{IllegalArgumentException: 
> Invalid negative offset}} occures. See stracktrace below.
> The problem here is, that the admin client returns -1 as timestamps and 
> offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] 
> Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} 
> object from the admin client response the exception is thrown.
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition 
> splits due to 
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>     at 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
>     at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>     at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>     at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
>     ... 8 common frames omitted
> 15:25:58.025 INFO  [flink-akka.actor.default-dispatcher-11] 
> o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 
> 351e440289835f2ff3e6fee31bf6e13c).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
>     at 
> 

[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)

2022-07-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28185:
---
Labels: pull-request-available  (was: )

> "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
> 
>
> Key: FLINK-28185
> URL: https://issues.apache.org/jira/browse/FLINK-28185
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Flink 1.15.0
> Kafka 2.8.1
>Reporter: Peter Schrott
>Assignee: Mason Chen
>Priority: Minor
>  Labels: pull-request-available
> Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, 
> NegativeOffsetSpec.scala
>
>
> When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty 
> partitions – little traffice + low retention – an {{IllegalArgumentException: 
> Invalid negative offset}} occures. See stracktrace below.
> The problem here is, that the admin client returns -1 as timestamps and 
> offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] 
> Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} 
> object from the admin client response the exception is thrown.
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition 
> splits due to 
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>     at 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
>     at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>     at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>     at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
>     ... 8 common frames omitted
> 15:25:58.025 INFO  [flink-akka.actor.default-dispatcher-11] 
> o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 
> 351e440289835f2ff3e6fee31bf6e13c).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
>     at 
> 

[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)

2022-06-28 Thread Arseniy Tashoyan (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Arseniy Tashoyan updated an issue  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Flink /  FLINK-28185  
 
 
  "Invalid negative offset" when using OffsetsInitializer.timestamp(.)   
 

  
 
 
 
 

 
Change By: 
 Arseniy Tashoyan  
 
 
Attachment: 
 NegativeOffsetSpec.scala  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Updated] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)

2022-06-21 Thread Peter Schrott (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Schrott updated FLINK-28185:
--
Description: 
When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty 
partitions – little traffice + low retention – an {{IllegalArgumentException: 
Invalid negative offset}} occures. See stracktrace below.

The problem here is, that the admin client returns -1 as timestamps and offset 
for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] Please 
compare the attached screenshot. When creating {{OffsetAndTimestamp}} object 
from the admin client response the exception is thrown.
{code:java}
org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition 
splits due to 
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
    at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Invalid negative offset
    at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36)
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
    at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
    at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
    at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
    at 
org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
    ... 8 common frames omitted
15:25:58.025 INFO  [flink-akka.actor.default-dispatcher-11] 
o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 
351e440289835f2ff3e6fee31bf6e13c).
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:329)
    at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at