Danny Cranmer created FLINK-20088:
-------------------------------------

             Summary: [Kinesis][Polling] Issue using Polling consumer at 
timestamp with empty shard
                 Key: FLINK-20088
                 URL: https://issues.apache.org/jira/browse/FLINK-20088
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kinesis
            Reporter: Danny Cranmer
             Fix For: 1.12.0


*Background*

The consumer fails when a Polling record publisher uses a timestamp sentinel 
starting position and the first record batch is empty. This is because the 
consumer tries to recalculate the start position from the timestamp sentinel, 
this operation is not supported.

*Reproduction Steps*

Setup an application consuming from Kinesis with following properties and 
consume from an empty shard:
{code:java}
String format = "yyyy-MM-dd'T'HH:mm:ss";
String date = new SimpleDateFormat(format).format(new Date());

consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
date);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
 format);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP"); {code}
*Error*
{code:java}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
 at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
 at akka.dispatch.OnComplete.internal(Future.scala:264) at 
akka.dispatch.OnComplete.internal(Future.scala:261) at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at 
scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at 
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at 
scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at 
akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
 by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at 
akka.actor.Actor$class.aroundReceive(Actor.scala) at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at 
akka.actor.ActorCell.invoke(ActorCell.scala:561) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at 
akka.dispatch.Mailbox.run(Mailbox.scala:225) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 moreCaused by: 
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM at 
software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:107)
 at 
software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSequenceNumber(StartingPosition.java:90)
 at 
software.amazon.kinesis.connectors.flink.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:73)
 at 
software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:113)
 at 
software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:98)
 at 
software.amazon.kinesis.connectors.flink.internals.ShardConsumer.run(ShardConsumer.java:108)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at 
java.util.concurrent.FutureTask.run(FutureTask.java) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to