[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120236128 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -193,6 +197,21 @@ object KinesisInputDStream { } /** + * Sets the Kinesis initial position data to the provided timestamp. + * Sets InitialPositionInStream to [[InitialPositionInStream.AT_TIMESTAMP]] + * and the timestamp to the provided value. + * + * @param timestamp Timestamp to resume the Kinesis stream from a provided + * timestamp. + * @return Reference to this [[KinesisInputDStream.Builder]] + */ +def withTimestampAtInitialPositionInStream(timestamp: Date) : Builder = { --- End diff -- Got it now. Read your new comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120236059 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -100,6 +103,7 @@ object KinesisInputDStream { private var endpointUrl: Option[String] = None private var regionName: Option[String] = None private var initialPositionInStream: Option[InitialPositionInStream] = None +private var initialPositionInStreamTimestamp: Option[Date] = None --- End diff -- Ah alright, so you're asking to get another `initialPositionInStreamTimestamp`. Thats similar to the `withInitialPositionAtTimestamp`. Can rename that to suit this purpose. Another question, The InitialPosition gets passed to the KinesisReceiver. I was passing a timestamp along with the Initial position at the moment. Are we planning to pass the `KinesisClientLibConfiguration` to the `KinesisReceiver` now ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120235938 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -193,6 +197,21 @@ object KinesisInputDStream { } /** + * Sets the Kinesis initial position data to the provided timestamp. + * Sets InitialPositionInStream to [[InitialPositionInStream.AT_TIMESTAMP]] + * and the timestamp to the provided value. + * + * @param timestamp Timestamp to resume the Kinesis stream from a provided + * timestamp. + * @return Reference to this [[KinesisInputDStream.Builder]] + */ +def withTimestampAtInitialPositionInStream(timestamp: Date) : Builder = { --- End diff -- I just suggested renaming it. Sorry for the confusion --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120235619 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -193,6 +197,21 @@ object KinesisInputDStream { } /** + * Sets the Kinesis initial position data to the provided timestamp. + * Sets InitialPositionInStream to [[InitialPositionInStream.AT_TIMESTAMP]] + * and the timestamp to the provided value. + * + * @param timestamp Timestamp to resume the Kinesis stream from a provided + * timestamp. + * @return Reference to this [[KinesisInputDStream.Builder]] + */ +def withTimestampAtInitialPositionInStream(timestamp: Date) : Builder = { --- End diff -- @brkyvz `withInitialPositionAtTimestamp` is an enhancer method for the InitialPositionAtTimestamp. If provided It will set the timestamp value along with the InitialPosition.AT_TIMESTAMP. Its optional, hence the `initialPositionInStream` can still be used. This will not introduce and incompatibilities in usage. Thoughts ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120234538 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -100,6 +103,7 @@ object KinesisInputDStream { private var endpointUrl: Option[String] = None private var regionName: Option[String] = None private var initialPositionInStream: Option[InitialPositionInStream] = None +private var initialPositionInStreamTimestamp: Option[Date] = None --- End diff -- I'm hoping we won't have to take both `initialPositionInStream` and `initialPositionInStreamTimestamp`. The builder is internal APIs, therefore we can definitely change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120234200 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -100,6 +103,7 @@ object KinesisInputDStream { private var endpointUrl: Option[String] = None private var regionName: Option[String] = None private var initialPositionInStream: Option[InitialPositionInStream] = None +private var initialPositionInStreamTimestamp: Option[Date] = None --- End diff -- @brkyvz Where exactly are we planning to add these changes. Are you proposing to change the type of `private var initialPositionInStreamTimestamp: Option[Date] = None` That would introduce a backward incompatibility on the current builder ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120180756 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -100,6 +103,7 @@ object KinesisInputDStream { private var endpointUrl: Option[String] = None private var regionName: Option[String] = None private var initialPositionInStream: Option[InitialPositionInStream] = None +private var initialPositionInStreamTimestamp: Option[Date] = None --- End diff -- umm, I feel a better way would be like: ```scala trait InitialPosition { def setInitialPosition(clientConf: KinesisClientLibConfiguration): KinesisClientLibConfiguration } case object Latest { override def setInitialPosition(clientConf: KinesisClientLibConfiguration): KinesisClientLibConfiguration = { clientLibConf.withInitialPositionInStream(InitialPositionInStream.LATEST) } } case object TrimHorizon { override def setInitialPosition(clientConf: KinesisClientLibConfiguration): KinesisClientLibConfiguration = { clientLibConf.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) } } case class AtTimestamp(timestamp: Date) { override def setInitialPosition(clientConf: KinesisClientLibConfiguration): KinesisClientLibConfiguration = { clientLibConf. withTimestampAtInitialPositionInStream(timestamp) } } ``` what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120180952 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -111,5 +111,29 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + +val yesterday = DateUtils.addDays(new Date, -1) +val dStreamFromTimestamp = builder +.endpointUrl(customEndpointUrl) --- End diff -- please indent all these lines --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120181290 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -193,6 +197,21 @@ object KinesisInputDStream { } /** + * Sets the Kinesis initial position data to the provided timestamp. + * Sets InitialPositionInStream to [[InitialPositionInStream.AT_TIMESTAMP]] + * and the timestamp to the provided value. + * + * @param timestamp Timestamp to resume the Kinesis stream from a provided + * timestamp. + * @return Reference to this [[KinesisInputDStream.Builder]] + */ +def withTimestampAtInitialPositionInStream(timestamp: Date) : Builder = { --- End diff -- `withInitialPositionAtTimestamp`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119986035 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -45,7 +46,7 @@ public void testJavaKinesisDStreamBuilder() { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) + .initialPositionInStream(initialPosition, scala.Option.apply(null)) --- End diff -- @budde not having the overloaded methods introduces this backward compatibility issue which I didn't like much. What are your thoughts on this ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119986280 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -111,5 +110,28 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + +val yesterday = DateUtils.addDays(new Date, -1) +val dStreamFromTimestamp = builder +.endpointUrl(customEndpointUrl) +.regionName(customRegion) +.initialPositionInStream(InitialPositionInStream.AT_TIMESTAMP, Some(yesterday)) --- End diff -- @budde Added optional timestamp for resume, but passing as Some() doesn't seem very interesting. Passing a date directly seems more intuitive. Thoughts ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119984045 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -38,6 +40,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( val endpointUrl: String, val regionName: String, val initialPositionInStream: InitialPositionInStream, +val initialPositionInStreamTimestamp: Date, --- End diff -- @budde - I had two approaches in mind while adding this functionality- 1. Additional parameter which can be set by an overloaded method in Builder. 2. Creating a new case class for wrapping initial position with an optional timestamp. I went ahead with implementing the first one for backward compatibility such that users can use their same builders. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119725220 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -73,7 +73,7 @@ object KinesisUtils { // Setting scope to override receiver stream's scope of "receiver stream" ssc.withNamedScope("kinesis stream") { new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), -initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, +initialPositionInStream, null, kinesisAppName, checkpointInterval, storageLevel, --- End diff -- Passing a ```null``` for an optional value is very un-idiomatic for Scala code. As noted in my previous comment, we should find a way to pass a single config option that represents all possible InitialPositionInStream configurations or at the very least make this argument an ```Option[Date]``` with a default value of ```None```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119724818 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -84,6 +84,7 @@ private[kinesis] class KinesisReceiver[T]( endpointUrl: String, regionName: String, initialPositionInStream: InitialPositionInStream, +initialPositionInStreamTimestamp: Date, --- End diff -- See my previous comment in **KinesisInputDStream.scala** about these parameters --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119723361 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -57,6 +58,26 @@ public void testJavaKinesisDStreamBuilder() { assert(kinesisDStream.checkpointAppName() == appName); assert(kinesisDStream.checkpointInterval() == checkpointInterval); assert(kinesisDStream._storageLevel() == storageLevel); + +Date yesterday = DateUtils.addDays(new Date(), -1); +KinesisInputDStream kinesisDStreamFromTimestamp = KinesisInputDStream.builder() +.streamingContext(ssc) +.streamName(streamName) +.endpointUrl(endpointUrl) +.regionName(region) +.initialPositionInStream(yesterday) +.checkpointAppName(appName) +.checkpointInterval(checkpointInterval) +.storageLevel(storageLevel) +.build(); --- End diff -- Indentation is inconsistent with lines 45-53 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119723865 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -148,17 +149,31 @@ private[kinesis] class KinesisReceiver[T]( kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider -val kinesisClientLibConfiguration = new KinesisClientLibConfiguration( - checkpointAppName, - streamName, - kinesisProvider, - dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), - cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), - workerId) -.withKinesisEndpoint(endpointUrl) -.withInitialPositionInStream(initialPositionInStream) -.withTaskBackoffTimeMillis(500) -.withRegionName(regionName) + +val kinesisClientLibConfiguration = { + var clientLibConf = new KinesisClientLibConfiguration( +checkpointAppName, +streamName, +kinesisProvider, +dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), +cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), +workerId) + .withKinesisEndpoint(endpointUrl) + .withTaskBackoffTimeMillis(500) + .withRegionName(regionName) + + /** Enhance the Kinesis receiver based on InitialPositionInStream */ --- End diff -- I'd use the standard ```// comment``` syntax here instead of JavaDoc as you aren't documenting a class or method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119724691 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -38,6 +40,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( val endpointUrl: String, val regionName: String, val initialPositionInStream: InitialPositionInStream, +val initialPositionInStreamTimestamp: Date, --- End diff -- I think there needs to be a better abstraction around the ```initialPositionInStream``` and ```initialPositionInStreamTimestamp``` options. Providing both is redundant as a user would want to specify one or the other. Additionally, if ```initialPositionInStreamTimestamp``` is an optional value then its type should at the very least be ```Option[Date]``` with a default value of ```None``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119723184 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -112,4 +112,38 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) } + + test("should propagate kinesis fetch timestamp values to KinesisInputDStream") { --- End diff -- Why not just roll this into the previous check? There's a lot of code duplication going on for no other reason than to check that another config value is passed properly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
GitHub user yssharma opened a pull request: https://github.com/apache/spark/pull/18029 [SPARK-20168][WIP][DStream] Add changes to use kinesis fetches from specified timestamp ## What changes were proposed in this pull request? Kinesis client can resume from a specified timestamp while creating a stream. We should have option to pass a timestamp in config to allow kinesis to resume from the given timestamp. The patch introduces a new `KinesisInitialPositionInStream` that takes the `InitialPositionInStream` with the `timestamp` information that can be used to resume kinesis fetches from the provided timestamp. ## How was this patch tested? todo cc : @budde @brkyvz You can merge this pull request into a Git repository by running: $ git pull https://github.com/yssharma/spark ysharma/kcl_resume Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18029 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org