Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850792 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -148,18 +149,30 @@ private[kinesis] class KinesisReceiver[T]( kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider - val kinesisClientLibConfiguration = new KinesisClientLibConfiguration( - checkpointAppName, - streamName, - kinesisProvider, - dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), - cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), - workerId) + + val kinesisClientLibConfiguration = { + val baseClientLibConfiguration = new KinesisClientLibConfiguration( + checkpointAppName, + streamName, + kinesisProvider, + dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), + cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), + workerId) .withKinesisEndpoint(endpointUrl) - .withInitialPositionInStream(initialPositionInStream) + .withInitialPositionInStream(initialPosition.initialPositionInStream) .withTaskBackoffTimeMillis(500) .withRegionName(regionName) + // Update the Kinesis client lib config with timestamp + // if InitialPositionInStream.AT_TIMESTAMP is passed + initialPosition match { + case atTimestamp: AtTimestamp => --- End diff -- nit: ```scala initialPosition match { case AtTimestamp(ts) => baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts) ... } ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org