[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18029


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r158627994
  
--- Diff: 
external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
 ---
@@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() {
   .streamName(streamName)
   .endpointUrl(endpointUrl)
   .regionName(region)
-  .initialPositionInStream(initialPosition)
+  .initialPosition(initialPosition)
   .checkpointAppName(appName)
   .checkpointInterval(checkpointInterval)
   .storageLevel(storageLevel)
   .build();
 assert(kinesisDStream.streamName() == streamName);
 assert(kinesisDStream.endpointUrl() == endpointUrl);
 assert(kinesisDStream.regionName() == region);
-assert(kinesisDStream.initialPositionInStream() == initialPosition);
+assert(kinesisDStream.initialPosition().getPosition() == 
initialPosition.getPosition());
+assert(kinesisDStream.checkpointAppName() == appName);
+assert(kinesisDStream.checkpointInterval() == checkpointInterval);
+assert(kinesisDStream._storageLevel() == storageLevel);
+ssc.stop();
+  }
+
+  /**
+   * Test to ensure that the old API for InitialPositionInStream
+   * is supported in KinesisDStream.Builder.
+   * This test would be removed when we deprecate the KinesisUtils.
+   */
+  @Test
+  public void testJavaKinesisDStreamBuilderOldApi() {
+String streamName = "a-very-nice-stream-name";
+String endpointUrl = "https://kinesis.us-west-2.amazonaws.com;;
+String region = "us-west-2";
+String appName = "a-very-nice-kinesis-app";
+Duration checkpointInterval = Seconds.apply(30);
+StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
+
+KinesisInputDStream kinesisDStream = 
KinesisInputDStream.builder()
+.streamingContext(ssc)
+.streamName(streamName)
+.endpointUrl(endpointUrl)
+.regionName(region)
+.initialPositionInStream(InitialPositionInStream.LATEST)
+.checkpointAppName(appName)
+.checkpointInterval(checkpointInterval)
+.storageLevel(storageLevel)
+.build();
+assert(kinesisDStream.streamName() == streamName);
+assert(kinesisDStream.endpointUrl() == endpointUrl);
+assert(kinesisDStream.regionName() == region);
+assert(kinesisDStream.initialPosition().getPosition() == 
InitialPositionInStream.LATEST);
 assert(kinesisDStream.checkpointAppName() == appName);
 assert(kinesisDStream.checkpointInterval() == checkpointInterval);
 assert(kinesisDStream._storageLevel() == storageLevel);
 ssc.stop();
   }
+
+  /**
+   * Test to ensure that the old API for InitialPositionInStream
+   * is supported in KinesisDStream.Builder.
+   * Test old API doesn't support the InitialPositionInStream.AT_TIMESTAMP.
+   * This test would be removed when we deprecate the KinesisUtils.
+   */
+  @Test
+  public void testJavaKinesisDStreamBuilderOldApiAtTimestamp() {
--- End diff --

This test could be moved to become a Scala test instead, using 
```scala
intercept[UnsupportedOperationException] {
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r158627640
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -56,12 +57,13 @@ import org.apache.spark.util.Utils
  * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
  * @param regionName  Region name used by the Kinesis Client Library for
  *DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
- * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+ * @param initialPosition  Instance of [[KinesisInitialPosition]]
+ * In the absence of Kinesis checkpoint 
info, this is the
--- End diff --

nit: indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r158627744
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * A java wrapper for exposing [[InitialPositionInStream]]
+ * to the corresponding Kinesis readers.
+ */
+interface KinesisInitialPosition {
+InitialPositionInStream getPosition();
+}
+
+public class KinesisInitialPositions {
+public static class Latest implements KinesisInitialPosition, 
Serializable {
+public Latest() {}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.LATEST;
+}
+}
+
+public static class TrimHorizon implements KinesisInitialPosition, 
Serializable {
+public TrimHorizon() {}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.TRIM_HORIZON;
+}
+}
+
+public static class AtTimestamp implements KinesisInitialPosition, 
Serializable {
+private Date timestamp;
+
+public AtTimestamp(Date timestamp) {
+this.timestamp = timestamp;
+}
+
+@Override
+public InitialPositionInStream getPosition() {
+return InitialPositionInStream.AT_TIMESTAMP;
+}
+
+public Date getTimestamp() {
+return timestamp;
+}
+}
+
+
+/**
+ * Returns instance of [[KinesisInitialPosition]] based on the passed 
[[InitialPositionInStream]].
+ * This method is used in KinesisUtils for translating the 
InitialPositionInStream
+ * to InitialPosition. This function would be removed when we 
deprecate the KinesisUtils.
+ *
+ * @return [[InitialPosition]]
+ */
+public static KinesisInitialPosition fromKinesisInitialPosition(
+InitialPositionInStream initialPositionInStream) throws 
UnsupportedOperationException {
+if (initialPositionInStream == InitialPositionInStream.LATEST) {
+return new Latest();
+} else if (initialPositionInStream == 
InitialPositionInStream.TRIM_HORIZON) {
+return new TrimHorizon();
+} else {
+// InitialPositionInStream.AT_TIMESTAMP is not supported.
+// Use InitialPosition.atTimestamp(timestamp) instead.
+throw new UnsupportedOperationException(
+"Only InitialPositionInStream.LATEST and 
InitialPositionInStream.TRIM_HORIZON " +
+"supported in initialPositionInStream(). 
Please use the initialPosition() from " +
+"builder API in KinesisInputDStream for using 
InitialPositionInStream.AT_TIMESTAMP");
+}
+}
+}
--- End diff --

nit: new line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-24 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r158627941
  
--- Diff: 
external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
 ---
@@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() {
   .streamName(streamName)
   .endpointUrl(endpointUrl)
   .regionName(region)
-  .initialPositionInStream(initialPosition)
+  .initialPosition(initialPosition)
   .checkpointAppName(appName)
   .checkpointInterval(checkpointInterval)
   .storageLevel(storageLevel)
   .build();
 assert(kinesisDStream.streamName() == streamName);
 assert(kinesisDStream.endpointUrl() == endpointUrl);
 assert(kinesisDStream.regionName() == region);
-assert(kinesisDStream.initialPositionInStream() == initialPosition);
+assert(kinesisDStream.initialPosition().getPosition() == 
initialPosition.getPosition());
+assert(kinesisDStream.checkpointAppName() == appName);
+assert(kinesisDStream.checkpointInterval() == checkpointInterval);
+assert(kinesisDStream._storageLevel() == storageLevel);
+ssc.stop();
+  }
+
+  /**
+   * Test to ensure that the old API for InitialPositionInStream
+   * is supported in KinesisDStream.Builder.
+   * This test would be removed when we deprecate the KinesisUtils.
+   */
+  @Test
+  public void testJavaKinesisDStreamBuilderOldApi() {
+String streamName = "a-very-nice-stream-name";
+String endpointUrl = "https://kinesis.us-west-2.amazonaws.com;;
+String region = "us-west-2";
+String appName = "a-very-nice-kinesis-app";
+Duration checkpointInterval = Seconds.apply(30);
+StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
+
+KinesisInputDStream kinesisDStream = 
KinesisInputDStream.builder()
+.streamingContext(ssc)
--- End diff --

nit: indentation should be 2 spaces.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-14 Thread yashs360
Github user yashs360 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r157086878
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
--- End diff --

Hi @brkyvz , Thanks for the review. 
Are you suggesting to put everything into a new object. And refer the case 
objects from the java class methods?
In that case is it better to create the objects in Java and expose them 
directly, since we will have cases where we will need direct access to the case 
objects/classes (instead of the java methods) like one of the test cases:
`initialPosition.asInstanceOf[AtTimestamp].timestamp`

I would create a new branch with the changes and share with you if its fine 
?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r156266325
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import java.util.Date;
+
+/**
+ * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition
+ * to expose the corresponding scala objects for InitialPositionInStream.
+ * The functions are intentionally Upper cased to appear like classes for
+ * usage in Java classes.
+ */
+public class KinesisInitialPosition {
--- End diff --

please add `@InterfaceStability.Evolving` above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r156265997
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
 ---
@@ -101,12 +102,60 @@ class KinesisInputDStreamBuilderSuite extends 
TestSuiteBase with BeforeAndAfterE
   .build()
 assert(dstream.endpointUrl == customEndpointUrl)
 assert(dstream.regionName == customRegion)
-assert(dstream.initialPositionInStream == customInitialPosition)
+assert(dstream.initialPosition == customInitialPosition)
 assert(dstream.checkpointAppName == customAppName)
 assert(dstream.checkpointInterval == customCheckpointInterval)
 assert(dstream._storageLevel == customStorageLevel)
 assert(dstream.kinesisCreds == customKinesisCreds)
 assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
 assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
+
+// Testing with AtTimestamp
+val cal = Calendar.getInstance()
+cal.add(Calendar.DATE, -1)
+val timestamp = cal.getTime()
+val initialPositionAtTimestamp = AtTimestamp(timestamp)
+
+val dstreamAtTimestamp = builder
+  .endpointUrl(customEndpointUrl)
+  .regionName(customRegion)
+  .initialPosition(initialPositionAtTimestamp)
+  .checkpointAppName(customAppName)
+  .checkpointInterval(customCheckpointInterval)
+  .storageLevel(customStorageLevel)
+  .kinesisCredentials(customKinesisCreds)
+  .dynamoDBCredentials(customDynamoDBCreds)
+  .cloudWatchCredentials(customCloudWatchCreds)
+  .build()
+assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
+assert(dstreamAtTimestamp.regionName == customRegion)
+assert(dstreamAtTimestamp.initialPosition.initialPositionInStream
+  == initialPositionAtTimestamp.initialPositionInStream)
+assert(
+  
dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].timestamp.equals(timestamp))
+assert(dstreamAtTimestamp.checkpointAppName == customAppName)
+assert(dstreamAtTimestamp.checkpointInterval == 
customCheckpointInterval)
+assert(dstreamAtTimestamp._storageLevel == customStorageLevel)
+assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds)
+assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds))
+assert(dstreamAtTimestamp.cloudWatchCreds == 
Option(customCloudWatchCreds))
+
+// Testing with AtTimestamp
+val initialPositionAtTimestamp2 = AtTimestamp(timestamp)
--- End diff --

how is the following lines a different test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-12-11 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r156266783
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
--- End diff --

can you wrap all of these in an object?
```scala
sealed trait InitialPosition {
  ...
}

object internal {
  case object Latest extends InitialPosition {
  }
  ...
  case class AtTimestamp(timestamp: Date) extends InitialPosition {
  }
}
```
Note how InitialPosition is outside, and `internal` is lowercase.

so that people go only through the Java Interface 
(`org.apache.spark.streaming.kinesis.Latest()`) etc

Your documentation and test cases go through the Scala interface which 
makes it super weird to have 2 things corresponding to the same thing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-12 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r144336164
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -182,14 +182,29 @@ object KinesisInputDStream {
 
 /**
  * Sets the initial position data is read from in the Kinesis stream. 
Defaults to
+ * [[Latest]] if no custom value is specified.
+ *
+ * @param initialPosition [[InitialPosition]] value specifying where 
Spark Streaming
+ *will start reading records in the Kinesis 
stream from
+ * @return Reference to this [[KinesisInputDStream.Builder]]
+ */
+def initialPosition(initialPosition: InitialPosition): Builder = {
+  this.initialPosition = Option(initialPosition)
+  this
+}
+
+/**
+ * Sets the initial position data is read from in the Kinesis stream. 
Defaults to
  * [[InitialPositionInStream.LATEST]] if no custom value is specified.
+ * This function would be removed when we deprecate the KinesisUtils.
  *
  * @param initialPosition InitialPositionInStream value specifying 
where Spark Streaming
  *will start reading records in the Kinesis 
stream from
  * @return Reference to this [[KinesisInputDStream.Builder]]
  */
+@deprecated("use initialPosition(initialPosition: InitialPosition)", 
"2.0.0")
--- End diff --

2.3.0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143850792
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -148,18 +149,30 @@ private[kinesis] class KinesisReceiver[T](
 
 kinesisCheckpointer = new KinesisCheckpointer(receiver, 
checkpointInterval, workerId)
 val kinesisProvider = kinesisCreds.provider
-val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
-  checkpointAppName,
-  streamName,
-  kinesisProvider,
-  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
-  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
-  workerId)
+
+val kinesisClientLibConfiguration = {
+  val baseClientLibConfiguration = new KinesisClientLibConfiguration(
+checkpointAppName,
+streamName,
+kinesisProvider,
+dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
+cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
+workerId)
 .withKinesisEndpoint(endpointUrl)
-.withInitialPositionInStream(initialPositionInStream)
+
.withInitialPositionInStream(initialPosition.initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)
 
+  // Update the Kinesis client lib config with timestamp
+  // if InitialPositionInStream.AT_TIMESTAMP is passed
+  initialPosition match {
+case atTimestamp: AtTimestamp =>
--- End diff --

nit: 
```scala
initialPosition match {
  case AtTimestamp(ts) =>
baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts)
...
}

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143850553
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -182,14 +181,14 @@ object KinesisInputDStream {
 
 /**
  * Sets the initial position data is read from in the Kinesis stream. 
Defaults to
- * [[InitialPositionInStream.LATEST]] if no custom value is specified.
+ * [[InitialPosition.latest]] if no custom value is specified.
  *
- * @param initialPosition InitialPositionInStream value specifying 
where Spark Streaming
+ * @param initialPosition [[InitialPosition]] value specifying where 
Spark Streaming
  *will start reading records in the Kinesis 
stream from
  * @return Reference to this [[KinesisInputDStream.Builder]]
  */
-def initialPositionInStream(initialPosition: InitialPositionInStream): 
Builder = {
--- End diff --

don't remove this API, since it will break compatibility. Instead add an 
API to take the `withTimestamp`. In the end if we see that `withTimestamp` has 
been set, but initial position isn't `AtTimestamp`, then we throw an error. 
Likewise if `AtTimestamp` is set, but no timestamp has been provided, also 
throw an error.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143849596
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import java.util.Date;
+
+/**
+ * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition
+ * to expose the corresponding scala objects for InitialPositionInStream.
+ * The functions are intentionally Upper cased to appear like classes for
+ * usage in Java classes.
+ */
+public class KinesisInitialPosition {
+
+/**
+ * Returns instance of AtTimestamp with InitialPositionInStream.LATEST.
+ * @return
+ */
+public static InitialPosition Latest() {
+return InitialPosition$.MODULE$.latest();
--- End diff --

Can you instead return `Latest$.MODULE$`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143850857
  
--- Diff: 
external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
 ---
@@ -45,15 +43,15 @@ public void testJavaKinesisDStreamBuilder() {
   .streamName(streamName)
   .endpointUrl(endpointUrl)
   .regionName(region)
-  .initialPositionInStream(initialPosition)
--- End diff --

ditto on API change


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143849691
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import java.util.Date;
+
+/**
+ * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition
+ * to expose the corresponding scala objects for InitialPositionInStream.
+ * The functions are intentionally Upper cased to appear like classes for
+ * usage in Java classes.
+ */
+public class KinesisInitialPosition {
+
+/**
+ * Returns instance of AtTimestamp with InitialPositionInStream.LATEST.
+ * @return
+ */
+public static InitialPosition Latest() {
+return InitialPosition$.MODULE$.latest();
+}
+
+/**
+ * Returns instance of AtTimestamp with 
InitialPositionInStream.TRIM_HORIZON.
+ * @return
+ */
+public static InitialPosition TrimHorizon() {
+return InitialPosition$.MODULE$.trimHorizon();
+}
+
+/**
+ * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+ * @param timestamp
+ * @return
+ */
+public static InitialPosition AtTimestamp(Date timestamp) {
+return InitialPosition$.MODULE$.atTimestamp(timestamp);
--- End diff --

`AtTimestamp.apply(timestamp)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143849634
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import java.util.Date;
+
+/**
+ * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition
+ * to expose the corresponding scala objects for InitialPositionInStream.
+ * The functions are intentionally Upper cased to appear like classes for
+ * usage in Java classes.
+ */
+public class KinesisInitialPosition {
+
+/**
+ * Returns instance of AtTimestamp with InitialPositionInStream.LATEST.
+ * @return
+ */
+public static InitialPosition Latest() {
+return InitialPosition$.MODULE$.latest();
+}
+
+/**
+ * Returns instance of AtTimestamp with 
InitialPositionInStream.TRIM_HORIZON.
+ * @return
+ */
+public static InitialPosition TrimHorizon() {
+return InitialPosition$.MODULE$.trimHorizon();
--- End diff --

Can you instead return `TrimHorizon$.MODULE$`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143849442
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest: InitialPosition = Latest
+
+  /**
+   * An instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  val trimHorizon: InitialPosition = TrimHorizon
--- End diff --

We don't need both the Scala API for this anymore. We can just use the Java 
objects directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143850590
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -182,14 +181,14 @@ object KinesisInputDStream {
 
 /**
  * Sets the initial position data is read from in the Kinesis stream. 
Defaults to
- * [[InitialPositionInStream.LATEST]] if no custom value is specified.
+ * [[InitialPosition.latest]] if no custom value is specified.
--- End diff --

don't change docs


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-09 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137926490
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest: InitialPosition = Latest
--- End diff --

Implemented new java wrapper for the Api !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-07 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137684968
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
--- End diff --

I've implemented the functions with this Capital naming, but still feel a 
bit salty about this :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-06 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137343555
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest: InitialPosition = Latest
--- End diff --

Good question. They used to be just classes. Since we couldn't have a nice 
way for using a `case object` in Java, you need to add a $ after the classname, 
e.g. `TrimHorizon$`, we decided to go the class syntax'y way. In scala, this 
still allows you to use the class with `KinesisInitialPosition.Latest` for 
example.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-06 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137342850
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
--- End diff --

I like `InitialPosition initialPosition = 
KinesisInitialPosition.TrimHorizon();` best :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-06 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137245127
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
--- End diff --

Looks good.
How do you compare the syntax: 
`InitialPosition initialPosition = TrimHorizon.instance();`
or, introducing a new java class KinesisInitialPosition.java for:
`InitialPosition initialPosition = KinesisInitialPosition.TrimHorizon();`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-06 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137237166
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
--- End diff --

It was required for the Java Api for using `TrimHorizon.instance()`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-06 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137235013
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest: InitialPosition = Latest
--- End diff --

Interesting. Could you please explain why have we done this capitalization. 
Once() and ProcessingTime() are methods and shouldn't they be camel cased ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137086237
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest: InitialPosition = Latest
+
+  /**
+   * An instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  val trimHorizon: InitialPosition = TrimHorizon
+
+  /**
+   * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+   * @return [[AtTimestamp]]
+   */
+  def atTimestamp(timestamp: Date): InitialPosition = 
AtTimestamp(timestamp)
+
+  /**
+   * Returns instance of [[InitialPosition]] based on the passed 
[[InitialPositionInStream]].
+   * This method is used in KinesisUtils for translating the 
InitialPositionInStream
+   * to InitialPosition. This function would be removed when we deprecate 
the KinesisUtils.
+   *
+   * @return [[InitialPosition]]
+   */
+  def kinesisInitialPositionInStream(
+initialPositionInStream: InitialPositionInStream): InitialPosition = {
+if (initialPositionInStream == InitialPositionInStream.LATEST) {
+  latest
+} else if (initialPositionInStream == 
InitialPositionInStream.TRIM_HORIZON) {
+  trimHorizon
+} else {
+  // InitialPositionInStream.AT_TIMESTAMP is not supported.
+  // Use InitialPosition.atTimestamp(timestamp) instead.
+  throw new UnsupportedOperationException(
+"Only InitialPositionInStream.LATEST and 
InitialPositionInStream.TRIM_HORIZON" +
+  "supported in initialPositionInStream(). Use 
InitialPosition.atTimestamp(timestamp)" +
--- End diff --

Rather, `please use the builder API in ... to use AT_TIMESTAMP` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137086738
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
--- End diff --

For these to be Java friendly, we may need to put them in a java file, 
similar to 
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r136725034
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest: InitialPosition = Latest
--- End diff --

nit: The first letters can be capitalized. `Latest`, `TrimHorizon`, 
`AtTimestamp`... Similar to `Trigger.Once()` or `Trigger.ProcessingTime(..)` in 
Structured Streaming


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137083713
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
--- End diff --

why do you need the `instance`s?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-09-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r137085859
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest: InitialPosition = Latest
+
+  /**
+   * An instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  val trimHorizon: InitialPosition = TrimHorizon
+
+  /**
+   * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+   * @return [[AtTimestamp]]
+   */
+  def atTimestamp(timestamp: Date): InitialPosition = 
AtTimestamp(timestamp)
+
+  /**
+   * Returns instance of [[InitialPosition]] based on the passed 
[[InitialPositionInStream]].
+   * This method is used in KinesisUtils for translating the 
InitialPositionInStream
+   * to InitialPosition. This function would be removed when we deprecate 
the KinesisUtils.
+   *
+   * @return [[InitialPosition]]
+   */
+  def kinesisInitialPositionInStream(
--- End diff --

how about `fromKinesisInitialPosition`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-20 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134138994
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest : InitialPosition = Latest
+
+  /**
+   * An instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  val trimHorizon : InitialPosition = TrimHorizon
+
+  /**
+   * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+   * @return [[AtTimestamp]]
+   */
+  def atTimestamp(timestamp: Date) : InitialPosition = 
AtTimestamp(timestamp)
+
+  /**
+   * Returns instance of [[InitialPosition]] based on the passed 
[[InitialPositionInStream]].
+   * This method is used in KinesisUtils for translating the 
InitialPositionInStream
+   * to InitialPosition. This function would be removed when we deprecate 
the KinesisUtils.
+   *
+   * @return [[InitialPosition]]
+   */
+  def kinesisInitialPositionInStream(
+initialPositionInStream: InitialPositionInStream) : InitialPosition = {
--- End diff --

Added all other review comments. The indentation was making it look weird, 
so skipped the indentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134120545
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
 ---
@@ -104,12 +103,61 @@ class KinesisInputDStreamBuilderSuite extends 
TestSuiteBase with BeforeAndAfterE
   .build()
 assert(dstream.endpointUrl == customEndpointUrl)
 assert(dstream.regionName == customRegion)
-assert(dstream.initialPositionInStream == customInitialPosition)
+assert(dstream.initialPosition.initialPositionInStream
--- End diff --

*nit* Again, I think this could be simplified to:

```scala
assert(dstream.initialPosition == customInitialPosition)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134120489
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest : InitialPosition = Latest
+
+  /**
+   * An instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  val trimHorizon : InitialPosition = TrimHorizon
+
+  /**
+   * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+   * @return [[AtTimestamp]]
+   */
+  def atTimestamp(timestamp: Date) : InitialPosition = 
AtTimestamp(timestamp)
+
+  /**
+   * Returns instance of [[InitialPosition]] based on the passed 
[[InitialPositionInStream]].
+   * This method is used in KinesisUtils for translating the 
InitialPositionInStream
+   * to InitialPosition. This function would be removed when we deprecate 
the KinesisUtils.
+   *
+   * @return [[InitialPosition]]
+   */
+  def kinesisInitialPositionInStream(
+initialPositionInStream: InitialPositionInStream) : InitialPosition = {
--- End diff --

*nit* Indent this one more level. Remove space before ```:```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134120477
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest : InitialPosition = Latest
+
+  /**
+   * An instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  val trimHorizon : InitialPosition = TrimHorizon
+
+  /**
+   * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+   * @return [[AtTimestamp]]
+   */
+  def atTimestamp(timestamp: Date) : InitialPosition = 
AtTimestamp(timestamp)
--- End diff --

*nit* remove space before ```:```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134120566
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
 ---
@@ -104,12 +103,61 @@ class KinesisInputDStreamBuilderSuite extends 
TestSuiteBase with BeforeAndAfterE
   .build()
 assert(dstream.endpointUrl == customEndpointUrl)
 assert(dstream.regionName == customRegion)
-assert(dstream.initialPositionInStream == customInitialPosition)
+assert(dstream.initialPosition.initialPositionInStream
+  == customInitialPosition.initialPositionInStream)
 assert(dstream.checkpointAppName == customAppName)
 assert(dstream.checkpointInterval == customCheckpointInterval)
 assert(dstream._storageLevel == customStorageLevel)
 assert(dstream.kinesisCreds == customKinesisCreds)
 assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
 assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
+
+// Testing with InitialPosition.atTimestamp
+val cal = Calendar.getInstance()
+cal.add(Calendar.DATE, -1)
+val timestamp = cal.getTime()
+val initialPositionAtTimestamp = InitialPosition.atTimestamp(timestamp)
+
+val dstreamAtTimestamp = builder
+  .endpointUrl(customEndpointUrl)
+  .regionName(customRegion)
+  .initialPosition(initialPositionAtTimestamp)
+  .checkpointAppName(customAppName)
+  .checkpointInterval(customCheckpointInterval)
+  .storageLevel(customStorageLevel)
+  .kinesisCredentials(customKinesisCreds)
+  .dynamoDBCredentials(customDynamoDBCreds)
+  .cloudWatchCredentials(customCloudWatchCreds)
+  .build()
+assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
+assert(dstreamAtTimestamp.regionName == customRegion)
+assert(dstreamAtTimestamp.initialPosition.initialPositionInStream
--- End diff --

*nit* Again, I think this could be simplified to:

```scala
assert(dstream.initialPosition == initialPositionAtTimestamp)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134120535
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
 ---
@@ -72,7 +70,8 @@ class KinesisInputDStreamBuilderSuite extends 
TestSuiteBase with BeforeAndAfterE
 val dstream = builder.build()
 assert(dstream.endpointUrl == DEFAULT_KINESIS_ENDPOINT_URL)
 assert(dstream.regionName == DEFAULT_KINESIS_REGION_NAME)
-assert(dstream.initialPositionInStream == 
DEFAULT_INITIAL_POSITION_IN_STREAM)
+assert(dstream.initialPosition.initialPositionInStream
--- End diff --

*nit* Think this would be sufficient:

```scala
assert(dstream.initialPosition == DEFAULT_INITIAL_POSITION)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134120475
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest : InitialPosition = Latest
+
+  /**
+   * An instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  val trimHorizon : InitialPosition = TrimHorizon
--- End diff --

*nit* remove space before ```:```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134120473
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  val instance: InitialPosition = this
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest : InitialPosition = Latest
--- End diff --

*nit* remove space before ```:```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134068101
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  var initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * Returns instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  def latest() : InitialPosition = {
+Latest
+  }
+
+  /**
+   * Returns instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  def trimHorizon() : InitialPosition = {
+TrimHorizon
+  }
+
+  /**
+   * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+   * @return [[AtTimestamp]]
+   */
+  def atTimestamp(timestamp: Date) : InitialPosition = {
+AtTimestamp(timestamp)
+  }
+
+  /**
+   * Returns instance of [[InitialPosition]] based on the passed 
[[InitialPositionInStream]].
+   * @return [[InitialPosition]]
+   */
+  def kinesisInitialPositionInStream(
--- End diff --

Ok, I see that it is being used to maintain the original APIs present in 
```KinesisUtils```. However, we should be deprecating ```KinesisUtils``` at 
some (undetermined?) point. Would be good to remove this method at that time as 
well.

I'd suggest adding a comment to the docs for this method indicating it 
exists to maintain compatibility with the original ```KinesisUtils``` API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134063427
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  var initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  def instance: InitialPosition = this
--- End diff --

Is this necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134067474
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -148,18 +149,28 @@ private[kinesis] class KinesisReceiver[T](
 
 kinesisCheckpointer = new KinesisCheckpointer(receiver, 
checkpointInterval, workerId)
 val kinesisProvider = kinesisCreds.provider
-val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
+var kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
--- End diff --

Keep this a val, but you can introduce a new scope with a temp val using 
braces, e.g.: 

```scala
val kinesisClientLibConfiguration = {
  val baseClientLibConfiguration = new KinesisClientLibConfiguration(
  checkpointAppName,
  streamName,
  ...
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPosition.initialPositionInStream)
...

  initialPosition match {
// see comment below
...
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134063380
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  var initialPositionInStream: InitialPositionInStream
--- End diff --

This should be a ```val``` or, better yet, a ```def``` (```def``` can be 
overridden with ```val``` in child classes)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134067523
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  var initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * Returns instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  def latest() : InitialPosition = {
--- End diff --

I'd just make this a ```val``` or at least remove the parens as this method 
has no side-effects


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134067110
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -148,18 +149,28 @@ private[kinesis] class KinesisReceiver[T](
 
 kinesisCheckpointer = new KinesisCheckpointer(receiver, 
checkpointInterval, workerId)
 val kinesisProvider = kinesisCreds.provider
-val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
+var kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
   checkpointAppName,
   streamName,
   kinesisProvider,
   dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
   cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
   workerId)
 .withKinesisEndpoint(endpointUrl)
-.withInitialPositionInStream(initialPositionInStream)
+
.withInitialPositionInStream(initialPosition.initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)
 
+// Update the Kinesis client lib config with timestamp
+// if InitialPositionInStream.AT_TIMESTAMP is passed
+kinesisClientLibConfiguration =
+  if (initialPosition.initialPositionInStream == 
InitialPositionInStream.AT_TIMESTAMP) {
--- End diff --

Here's a more-stylish way of doing this in Scala:

```
initialPosition match {
  case atTimestamp: AtTimestamp =>

baseClientLibConfiguration.withTimestampAtInitialPositionInStream(atTimestamp.timestamp)
  case _ =>
baseClientLibConfiguration
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134067791
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -308,7 +308,6 @@ object KinesisInputDStream {
   private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String =
 "https://kinesis.us-east-1.amazonaws.com;
   private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
-  private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: 
InitialPositionInStream =
-InitialPositionInStream.LATEST
+  private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: InitialPosition 
= InitialPosition.latest
--- End diff --

*nit* Rename this to ```DEFAULT_INITIAL_POSITION``` to reflect the new 
class name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134067693
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  var initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * Returns instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  def latest() : InitialPosition = {
+Latest
+  }
+
+  /**
+   * Returns instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  def trimHorizon() : InitialPosition = {
+TrimHorizon
+  }
+
+  /**
+   * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+   * @return [[AtTimestamp]]
+   */
+  def atTimestamp(timestamp: Date) : InitialPosition = {
+AtTimestamp(timestamp)
+  }
+
+  /**
+   * Returns instance of [[InitialPosition]] based on the passed 
[[InitialPositionInStream]].
+   * @return [[InitialPosition]]
+   */
+  def kinesisInitialPositionInStream(
--- End diff --

Is this method really necessary? Especially if it can only be used for a 
subset of the official ```InitialPositionInStream``` implementations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134068186
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  var initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  def instance: InitialPosition = this
--- End diff --

Looks like it is for Java compatibility


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-08-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r134067537
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  var initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  def instance: InitialPosition = this
+  override var initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * Returns instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  def latest() : InitialPosition = {
+Latest
+  }
+
+  /**
+   * Returns instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  def trimHorizon() : InitialPosition = {
--- End diff --

Change to ```val```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org