Repository: spark
Updated Branches:
  refs/heads/master e6c6f90a5 -> 9fa4a1ed3


[SPARK-20168][STREAMING KINESIS] Setting the timestamp directly would cause 
exception on …

Setting the timestamp directly would cause exception on reading stream, it can 
be set directly only if the mode is not AT_TIMESTAMP

## What changes were proposed in this pull request?

The last patch in the kinesis streaming receiver sets the timestamp for the 
mode AT_TIMESTAMP, but this mode can only be set via the

`baseClientLibConfiguration.withTimestampAtInitialPositionInStream()
`
and can't be set directly using
`.withInitialPositionInStream()`

This patch fixes the issue.

## How was this patch tested?
Kinesis Receiver doesn't expose the internal state outside, so couldn't find 
the right way to test this change. Seeking for tips from other contributors 
here.

Author: Yash Sharma <ysha...@atlassian.com>

Closes #21541 from yashs360/ysharma/fix_kinesis_bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fa4a1ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fa4a1ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fa4a1ed

Branch: refs/heads/master
Commit: 9fa4a1ed38713e2d18a3320d3fc56f9f6db07b06
Parents: e6c6f90
Author: Yash Sharma <ysha...@atlassian.com>
Authored: Thu Jul 12 10:04:47 2018 -0700
Committer: Sean Owen <sro...@gmail.com>
Committed: Thu Jul 12 10:04:47 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/kinesis/KinesisReceiver.scala     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fa4a1ed/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index fa0de62..69c5236 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -160,7 +160,6 @@ private[kinesis] class KinesisReceiver[T](
         cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
         workerId)
         .withKinesisEndpoint(endpointUrl)
-        .withInitialPositionInStream(initialPosition.getPosition)
         .withTaskBackoffTimeMillis(500)
         .withRegionName(regionName)
 
@@ -169,7 +168,8 @@ private[kinesis] class KinesisReceiver[T](
       initialPosition match {
         case ts: AtTimestamp =>
           
baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts.getTimestamp)
-        case _ => baseClientLibConfiguration
+        case _ =>
+          
baseClientLibConfiguration.withInitialPositionInStream(initialPosition.getPosition)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to