[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18029 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r158627994 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) + .initialPosition(initialPosition) .checkpointAppName(appName) .checkpointInterval(checkpointInterval) .storageLevel(storageLevel) .build(); assert(kinesisDStream.streamName() == streamName); assert(kinesisDStream.endpointUrl() == endpointUrl); assert(kinesisDStream.regionName() == region); -assert(kinesisDStream.initialPositionInStream() == initialPosition); +assert(kinesisDStream.initialPosition().getPosition() == initialPosition.getPosition()); +assert(kinesisDStream.checkpointAppName() == appName); +assert(kinesisDStream.checkpointInterval() == checkpointInterval); +assert(kinesisDStream._storageLevel() == storageLevel); +ssc.stop(); + } + + /** + * Test to ensure that the old API for InitialPositionInStream + * is supported in KinesisDStream.Builder. + * This test would be removed when we deprecate the KinesisUtils. + */ + @Test + public void testJavaKinesisDStreamBuilderOldApi() { +String streamName = "a-very-nice-stream-name"; +String endpointUrl = "https://kinesis.us-west-2.amazonaws.com;; +String region = "us-west-2"; +String appName = "a-very-nice-kinesis-app"; +Duration checkpointInterval = Seconds.apply(30); +StorageLevel storageLevel = StorageLevel.MEMORY_ONLY(); + +KinesisInputDStreamkinesisDStream = KinesisInputDStream.builder() +.streamingContext(ssc) +.streamName(streamName) +.endpointUrl(endpointUrl) +.regionName(region) +.initialPositionInStream(InitialPositionInStream.LATEST) +.checkpointAppName(appName) +.checkpointInterval(checkpointInterval) +.storageLevel(storageLevel) +.build(); +assert(kinesisDStream.streamName() == streamName); +assert(kinesisDStream.endpointUrl() == endpointUrl); +assert(kinesisDStream.regionName() == region); +assert(kinesisDStream.initialPosition().getPosition() == InitialPositionInStream.LATEST); assert(kinesisDStream.checkpointAppName() == appName); assert(kinesisDStream.checkpointInterval() == checkpointInterval); assert(kinesisDStream._storageLevel() == storageLevel); ssc.stop(); } + + /** + * Test to ensure that the old API for InitialPositionInStream + * is supported in KinesisDStream.Builder. + * Test old API doesn't support the InitialPositionInStream.AT_TIMESTAMP. + * This test would be removed when we deprecate the KinesisUtils. + */ + @Test + public void testJavaKinesisDStreamBuilderOldApiAtTimestamp() { --- End diff -- This test could be moved to become a Scala test instead, using ```scala intercept[UnsupportedOperationException] { ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r158627640 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -56,12 +57,13 @@ import org.apache.spark.util.Utils * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) * @param regionName Region name used by the Kinesis Client Library for *DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * @param initialPosition Instance of [[KinesisInitialPosition]] + * In the absence of Kinesis checkpoint info, this is the --- End diff -- nit: indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r158627744 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; + +import java.io.Serializable; +import java.util.Date; + +/** + * A java wrapper for exposing [[InitialPositionInStream]] + * to the corresponding Kinesis readers. + */ +interface KinesisInitialPosition { +InitialPositionInStream getPosition(); +} + +public class KinesisInitialPositions { +public static class Latest implements KinesisInitialPosition, Serializable { +public Latest() {} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.LATEST; +} +} + +public static class TrimHorizon implements KinesisInitialPosition, Serializable { +public TrimHorizon() {} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.TRIM_HORIZON; +} +} + +public static class AtTimestamp implements KinesisInitialPosition, Serializable { +private Date timestamp; + +public AtTimestamp(Date timestamp) { +this.timestamp = timestamp; +} + +@Override +public InitialPositionInStream getPosition() { +return InitialPositionInStream.AT_TIMESTAMP; +} + +public Date getTimestamp() { +return timestamp; +} +} + + +/** + * Returns instance of [[KinesisInitialPosition]] based on the passed [[InitialPositionInStream]]. + * This method is used in KinesisUtils for translating the InitialPositionInStream + * to InitialPosition. This function would be removed when we deprecate the KinesisUtils. + * + * @return [[InitialPosition]] + */ +public static KinesisInitialPosition fromKinesisInitialPosition( +InitialPositionInStream initialPositionInStream) throws UnsupportedOperationException { +if (initialPositionInStream == InitialPositionInStream.LATEST) { +return new Latest(); +} else if (initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) { +return new TrimHorizon(); +} else { +// InitialPositionInStream.AT_TIMESTAMP is not supported. +// Use InitialPosition.atTimestamp(timestamp) instead. +throw new UnsupportedOperationException( +"Only InitialPositionInStream.LATEST and InitialPositionInStream.TRIM_HORIZON " + +"supported in initialPositionInStream(). Please use the initialPosition() from " + +"builder API in KinesisInputDStream for using InitialPositionInStream.AT_TIMESTAMP"); +} +} +} --- End diff -- nit: new line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r158627941 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) + .initialPosition(initialPosition) .checkpointAppName(appName) .checkpointInterval(checkpointInterval) .storageLevel(storageLevel) .build(); assert(kinesisDStream.streamName() == streamName); assert(kinesisDStream.endpointUrl() == endpointUrl); assert(kinesisDStream.regionName() == region); -assert(kinesisDStream.initialPositionInStream() == initialPosition); +assert(kinesisDStream.initialPosition().getPosition() == initialPosition.getPosition()); +assert(kinesisDStream.checkpointAppName() == appName); +assert(kinesisDStream.checkpointInterval() == checkpointInterval); +assert(kinesisDStream._storageLevel() == storageLevel); +ssc.stop(); + } + + /** + * Test to ensure that the old API for InitialPositionInStream + * is supported in KinesisDStream.Builder. + * This test would be removed when we deprecate the KinesisUtils. + */ + @Test + public void testJavaKinesisDStreamBuilderOldApi() { +String streamName = "a-very-nice-stream-name"; +String endpointUrl = "https://kinesis.us-west-2.amazonaws.com;; +String region = "us-west-2"; +String appName = "a-very-nice-kinesis-app"; +Duration checkpointInterval = Seconds.apply(30); +StorageLevel storageLevel = StorageLevel.MEMORY_ONLY(); + +KinesisInputDStreamkinesisDStream = KinesisInputDStream.builder() +.streamingContext(ssc) --- End diff -- nit: indentation should be 2 spaces. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yashs360 commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r157086878 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { --- End diff -- Hi @brkyvz , Thanks for the review. Are you suggesting to put everything into a new object. And refer the case objects from the java class methods? In that case is it better to create the objects in Java and expose them directly, since we will have cases where we will need direct access to the case objects/classes (instead of the java methods) like one of the test cases: `initialPosition.asInstanceOf[AtTimestamp].timestamp` I would create a new branch with the changes and share with you if its fine ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156266325 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { --- End diff -- please add `@InterfaceStability.Evolving` above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156265997 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -101,12 +102,60 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE .build() assert(dstream.endpointUrl == customEndpointUrl) assert(dstream.regionName == customRegion) -assert(dstream.initialPositionInStream == customInitialPosition) +assert(dstream.initialPosition == customInitialPosition) assert(dstream.checkpointAppName == customAppName) assert(dstream.checkpointInterval == customCheckpointInterval) assert(dstream._storageLevel == customStorageLevel) assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + +// Testing with AtTimestamp +val cal = Calendar.getInstance() +cal.add(Calendar.DATE, -1) +val timestamp = cal.getTime() +val initialPositionAtTimestamp = AtTimestamp(timestamp) + +val dstreamAtTimestamp = builder + .endpointUrl(customEndpointUrl) + .regionName(customRegion) + .initialPosition(initialPositionAtTimestamp) + .checkpointAppName(customAppName) + .checkpointInterval(customCheckpointInterval) + .storageLevel(customStorageLevel) + .kinesisCredentials(customKinesisCreds) + .dynamoDBCredentials(customDynamoDBCreds) + .cloudWatchCredentials(customCloudWatchCreds) + .build() +assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl) +assert(dstreamAtTimestamp.regionName == customRegion) +assert(dstreamAtTimestamp.initialPosition.initialPositionInStream + == initialPositionAtTimestamp.initialPositionInStream) +assert( + dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].timestamp.equals(timestamp)) +assert(dstreamAtTimestamp.checkpointAppName == customAppName) +assert(dstreamAtTimestamp.checkpointInterval == customCheckpointInterval) +assert(dstreamAtTimestamp._storageLevel == customStorageLevel) +assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds) +assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds)) +assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds)) + +// Testing with AtTimestamp +val initialPositionAtTimestamp2 = AtTimestamp(timestamp) --- End diff -- how is the following lines a different test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156266783 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { --- End diff -- can you wrap all of these in an object? ```scala sealed trait InitialPosition { ... } object internal { case object Latest extends InitialPosition { } ... case class AtTimestamp(timestamp: Date) extends InitialPosition { } } ``` Note how InitialPosition is outside, and `internal` is lowercase. so that people go only through the Java Interface (`org.apache.spark.streaming.kinesis.Latest()`) etc Your documentation and test cases go through the Scala interface which makes it super weird to have 2 things corresponding to the same thing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r144336164 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -182,14 +182,29 @@ object KinesisInputDStream { /** * Sets the initial position data is read from in the Kinesis stream. Defaults to + * [[Latest]] if no custom value is specified. + * + * @param initialPosition [[InitialPosition]] value specifying where Spark Streaming + *will start reading records in the Kinesis stream from + * @return Reference to this [[KinesisInputDStream.Builder]] + */ +def initialPosition(initialPosition: InitialPosition): Builder = { + this.initialPosition = Option(initialPosition) + this +} + +/** + * Sets the initial position data is read from in the Kinesis stream. Defaults to * [[InitialPositionInStream.LATEST]] if no custom value is specified. + * This function would be removed when we deprecate the KinesisUtils. * * @param initialPosition InitialPositionInStream value specifying where Spark Streaming *will start reading records in the Kinesis stream from * @return Reference to this [[KinesisInputDStream.Builder]] */ +@deprecated("use initialPosition(initialPosition: InitialPosition)", "2.0.0") --- End diff -- 2.3.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850792 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -148,18 +149,30 @@ private[kinesis] class KinesisReceiver[T]( kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider -val kinesisClientLibConfiguration = new KinesisClientLibConfiguration( - checkpointAppName, - streamName, - kinesisProvider, - dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), - cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), - workerId) + +val kinesisClientLibConfiguration = { + val baseClientLibConfiguration = new KinesisClientLibConfiguration( +checkpointAppName, +streamName, +kinesisProvider, +dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), +cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), +workerId) .withKinesisEndpoint(endpointUrl) -.withInitialPositionInStream(initialPositionInStream) + .withInitialPositionInStream(initialPosition.initialPositionInStream) .withTaskBackoffTimeMillis(500) .withRegionName(regionName) + // Update the Kinesis client lib config with timestamp + // if InitialPositionInStream.AT_TIMESTAMP is passed + initialPosition match { +case atTimestamp: AtTimestamp => --- End diff -- nit: ```scala initialPosition match { case AtTimestamp(ts) => baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts) ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850553 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -182,14 +181,14 @@ object KinesisInputDStream { /** * Sets the initial position data is read from in the Kinesis stream. Defaults to - * [[InitialPositionInStream.LATEST]] if no custom value is specified. + * [[InitialPosition.latest]] if no custom value is specified. * - * @param initialPosition InitialPositionInStream value specifying where Spark Streaming + * @param initialPosition [[InitialPosition]] value specifying where Spark Streaming *will start reading records in the Kinesis stream from * @return Reference to this [[KinesisInputDStream.Builder]] */ -def initialPositionInStream(initialPosition: InitialPositionInStream): Builder = { --- End diff -- don't remove this API, since it will break compatibility. Instead add an API to take the `withTimestamp`. In the end if we see that `withTimestamp` has been set, but initial position isn't `AtTimestamp`, then we throw an error. Likewise if `AtTimestamp` is set, but no timestamp has been provided, also throw an error. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143849596 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.LATEST. + * @return + */ +public static InitialPosition Latest() { +return InitialPosition$.MODULE$.latest(); --- End diff -- Can you instead return `Latest$.MODULE$` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850857 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -45,15 +43,15 @@ public void testJavaKinesisDStreamBuilder() { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) --- End diff -- ditto on API change --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143849691 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.LATEST. + * @return + */ +public static InitialPosition Latest() { +return InitialPosition$.MODULE$.latest(); +} + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.TRIM_HORIZON. + * @return + */ +public static InitialPosition TrimHorizon() { +return InitialPosition$.MODULE$.trimHorizon(); +} + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @param timestamp + * @return + */ +public static InitialPosition AtTimestamp(Date timestamp) { +return InitialPosition$.MODULE$.atTimestamp(timestamp); --- End diff -- `AtTimestamp.apply(timestamp)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143849634 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.LATEST. + * @return + */ +public static InitialPosition Latest() { +return InitialPosition$.MODULE$.latest(); +} + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.TRIM_HORIZON. + * @return + */ +public static InitialPosition TrimHorizon() { +return InitialPosition$.MODULE$.trimHorizon(); --- End diff -- Can you instead return `TrimHorizon$.MODULE$` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143849442 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon: InitialPosition = TrimHorizon --- End diff -- We don't need both the Scala API for this anymore. We can just use the Java objects directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850590 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -182,14 +181,14 @@ object KinesisInputDStream { /** * Sets the initial position data is read from in the Kinesis stream. Defaults to - * [[InitialPositionInStream.LATEST]] if no custom value is specified. + * [[InitialPosition.latest]] if no custom value is specified. --- End diff -- don't change docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137926490 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest --- End diff -- Implemented new java wrapper for the Api ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137684968 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { --- End diff -- I've implemented the functions with this Capital naming, but still feel a bit salty about this :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137343555 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest --- End diff -- Good question. They used to be just classes. Since we couldn't have a nice way for using a `case object` in Java, you need to add a $ after the classname, e.g. `TrimHorizon$`, we decided to go the class syntax'y way. In scala, this still allows you to use the class with `KinesisInitialPosition.Latest` for example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137342850 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { --- End diff -- I like `InitialPosition initialPosition = KinesisInitialPosition.TrimHorizon();` best :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137245127 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { --- End diff -- Looks good. How do you compare the syntax: `InitialPosition initialPosition = TrimHorizon.instance();` or, introducing a new java class KinesisInitialPosition.java for: `InitialPosition initialPosition = KinesisInitialPosition.TrimHorizon();` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137237166 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this --- End diff -- It was required for the Java Api for using `TrimHorizon.instance()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137235013 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest --- End diff -- Interesting. Could you please explain why have we done this capitalization. Once() and ProcessingTime() are methods and shouldn't they be camel cased ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137086237 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon: InitialPosition = TrimHorizon + + /** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @return [[AtTimestamp]] + */ + def atTimestamp(timestamp: Date): InitialPosition = AtTimestamp(timestamp) + + /** + * Returns instance of [[InitialPosition]] based on the passed [[InitialPositionInStream]]. + * This method is used in KinesisUtils for translating the InitialPositionInStream + * to InitialPosition. This function would be removed when we deprecate the KinesisUtils. + * + * @return [[InitialPosition]] + */ + def kinesisInitialPositionInStream( +initialPositionInStream: InitialPositionInStream): InitialPosition = { +if (initialPositionInStream == InitialPositionInStream.LATEST) { + latest +} else if (initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) { + trimHorizon +} else { + // InitialPositionInStream.AT_TIMESTAMP is not supported. + // Use InitialPosition.atTimestamp(timestamp) instead. + throw new UnsupportedOperationException( +"Only InitialPositionInStream.LATEST and InitialPositionInStream.TRIM_HORIZON" + + "supported in initialPositionInStream(). Use InitialPosition.atTimestamp(timestamp)" + --- End diff -- Rather, `please use the builder API in ... to use AT_TIMESTAMP` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137086738 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { --- End diff -- For these to be Java friendly, we may need to put them in a java file, similar to https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r136725034 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest --- End diff -- nit: The first letters can be capitalized. `Latest`, `TrimHorizon`, `AtTimestamp`... Similar to `Trigger.Once()` or `Trigger.ProcessingTime(..)` in Structured Streaming --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137083713 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this --- End diff -- why do you need the `instance`s? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137085859 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon: InitialPosition = TrimHorizon + + /** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @return [[AtTimestamp]] + */ + def atTimestamp(timestamp: Date): InitialPosition = AtTimestamp(timestamp) + + /** + * Returns instance of [[InitialPosition]] based on the passed [[InitialPositionInStream]]. + * This method is used in KinesisUtils for translating the InitialPositionInStream + * to InitialPosition. This function would be removed when we deprecate the KinesisUtils. + * + * @return [[InitialPosition]] + */ + def kinesisInitialPositionInStream( --- End diff -- how about `fromKinesisInitialPosition`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134138994 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest : InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon : InitialPosition = TrimHorizon + + /** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @return [[AtTimestamp]] + */ + def atTimestamp(timestamp: Date) : InitialPosition = AtTimestamp(timestamp) + + /** + * Returns instance of [[InitialPosition]] based on the passed [[InitialPositionInStream]]. + * This method is used in KinesisUtils for translating the InitialPositionInStream + * to InitialPosition. This function would be removed when we deprecate the KinesisUtils. + * + * @return [[InitialPosition]] + */ + def kinesisInitialPositionInStream( +initialPositionInStream: InitialPositionInStream) : InitialPosition = { --- End diff -- Added all other review comments. The indentation was making it look weird, so skipped the indentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134120545 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -104,12 +103,61 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE .build() assert(dstream.endpointUrl == customEndpointUrl) assert(dstream.regionName == customRegion) -assert(dstream.initialPositionInStream == customInitialPosition) +assert(dstream.initialPosition.initialPositionInStream --- End diff -- *nit* Again, I think this could be simplified to: ```scala assert(dstream.initialPosition == customInitialPosition) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134120489 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest : InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon : InitialPosition = TrimHorizon + + /** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @return [[AtTimestamp]] + */ + def atTimestamp(timestamp: Date) : InitialPosition = AtTimestamp(timestamp) + + /** + * Returns instance of [[InitialPosition]] based on the passed [[InitialPositionInStream]]. + * This method is used in KinesisUtils for translating the InitialPositionInStream + * to InitialPosition. This function would be removed when we deprecate the KinesisUtils. + * + * @return [[InitialPosition]] + */ + def kinesisInitialPositionInStream( +initialPositionInStream: InitialPositionInStream) : InitialPosition = { --- End diff -- *nit* Indent this one more level. Remove space before ```:```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134120477 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest : InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon : InitialPosition = TrimHorizon + + /** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @return [[AtTimestamp]] + */ + def atTimestamp(timestamp: Date) : InitialPosition = AtTimestamp(timestamp) --- End diff -- *nit* remove space before ```:``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134120566 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -104,12 +103,61 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE .build() assert(dstream.endpointUrl == customEndpointUrl) assert(dstream.regionName == customRegion) -assert(dstream.initialPositionInStream == customInitialPosition) +assert(dstream.initialPosition.initialPositionInStream + == customInitialPosition.initialPositionInStream) assert(dstream.checkpointAppName == customAppName) assert(dstream.checkpointInterval == customCheckpointInterval) assert(dstream._storageLevel == customStorageLevel) assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + +// Testing with InitialPosition.atTimestamp +val cal = Calendar.getInstance() +cal.add(Calendar.DATE, -1) +val timestamp = cal.getTime() +val initialPositionAtTimestamp = InitialPosition.atTimestamp(timestamp) + +val dstreamAtTimestamp = builder + .endpointUrl(customEndpointUrl) + .regionName(customRegion) + .initialPosition(initialPositionAtTimestamp) + .checkpointAppName(customAppName) + .checkpointInterval(customCheckpointInterval) + .storageLevel(customStorageLevel) + .kinesisCredentials(customKinesisCreds) + .dynamoDBCredentials(customDynamoDBCreds) + .cloudWatchCredentials(customCloudWatchCreds) + .build() +assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl) +assert(dstreamAtTimestamp.regionName == customRegion) +assert(dstreamAtTimestamp.initialPosition.initialPositionInStream --- End diff -- *nit* Again, I think this could be simplified to: ```scala assert(dstream.initialPosition == initialPositionAtTimestamp) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134120535 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -72,7 +70,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE val dstream = builder.build() assert(dstream.endpointUrl == DEFAULT_KINESIS_ENDPOINT_URL) assert(dstream.regionName == DEFAULT_KINESIS_REGION_NAME) -assert(dstream.initialPositionInStream == DEFAULT_INITIAL_POSITION_IN_STREAM) +assert(dstream.initialPosition.initialPositionInStream --- End diff -- *nit* Think this would be sufficient: ```scala assert(dstream.initialPosition == DEFAULT_INITIAL_POSITION) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134120475 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest : InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon : InitialPosition = TrimHorizon --- End diff -- *nit* remove space before ```:``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134120473 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest : InitialPosition = Latest --- End diff -- *nit* remove space before ```:``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134068101 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + var initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * Returns instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + def latest() : InitialPosition = { +Latest + } + + /** + * Returns instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + def trimHorizon() : InitialPosition = { +TrimHorizon + } + + /** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @return [[AtTimestamp]] + */ + def atTimestamp(timestamp: Date) : InitialPosition = { +AtTimestamp(timestamp) + } + + /** + * Returns instance of [[InitialPosition]] based on the passed [[InitialPositionInStream]]. + * @return [[InitialPosition]] + */ + def kinesisInitialPositionInStream( --- End diff -- Ok, I see that it is being used to maintain the original APIs present in ```KinesisUtils```. However, we should be deprecating ```KinesisUtils``` at some (undetermined?) point. Would be good to remove this method at that time as well. I'd suggest adding a comment to the docs for this method indicating it exists to maintain compatibility with the original ```KinesisUtils``` API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134063427 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + var initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + def instance: InitialPosition = this --- End diff -- Is this necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134067474 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -148,18 +149,28 @@ private[kinesis] class KinesisReceiver[T]( kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider -val kinesisClientLibConfiguration = new KinesisClientLibConfiguration( +var kinesisClientLibConfiguration = new KinesisClientLibConfiguration( --- End diff -- Keep this a val, but you can introduce a new scope with a temp val using braces, e.g.: ```scala val kinesisClientLibConfiguration = { val baseClientLibConfiguration = new KinesisClientLibConfiguration( checkpointAppName, streamName, ... .withKinesisEndpoint(endpointUrl) .withInitialPositionInStream(initialPosition.initialPositionInStream) ... initialPosition match { // see comment below ... } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134063380 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + var initialPositionInStream: InitialPositionInStream --- End diff -- This should be a ```val``` or, better yet, a ```def``` (```def``` can be overridden with ```val``` in child classes) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134067523 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + var initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * Returns instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + def latest() : InitialPosition = { --- End diff -- I'd just make this a ```val``` or at least remove the parens as this method has no side-effects --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134067110 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -148,18 +149,28 @@ private[kinesis] class KinesisReceiver[T]( kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider -val kinesisClientLibConfiguration = new KinesisClientLibConfiguration( +var kinesisClientLibConfiguration = new KinesisClientLibConfiguration( checkpointAppName, streamName, kinesisProvider, dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), workerId) .withKinesisEndpoint(endpointUrl) -.withInitialPositionInStream(initialPositionInStream) + .withInitialPositionInStream(initialPosition.initialPositionInStream) .withTaskBackoffTimeMillis(500) .withRegionName(regionName) +// Update the Kinesis client lib config with timestamp +// if InitialPositionInStream.AT_TIMESTAMP is passed +kinesisClientLibConfiguration = + if (initialPosition.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) { --- End diff -- Here's a more-stylish way of doing this in Scala: ``` initialPosition match { case atTimestamp: AtTimestamp => baseClientLibConfiguration.withTimestampAtInitialPositionInStream(atTimestamp.timestamp) case _ => baseClientLibConfiguration } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134067791 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -308,7 +308,6 @@ object KinesisInputDStream { private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String = "https://kinesis.us-east-1.amazonaws.com; private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1" - private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: InitialPositionInStream = -InitialPositionInStream.LATEST + private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: InitialPosition = InitialPosition.latest --- End diff -- *nit* Rename this to ```DEFAULT_INITIAL_POSITION``` to reflect the new class name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134067693 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + var initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * Returns instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + def latest() : InitialPosition = { +Latest + } + + /** + * Returns instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + def trimHorizon() : InitialPosition = { +TrimHorizon + } + + /** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @return [[AtTimestamp]] + */ + def atTimestamp(timestamp: Date) : InitialPosition = { +AtTimestamp(timestamp) + } + + /** + * Returns instance of [[InitialPosition]] based on the passed [[InitialPositionInStream]]. + * @return [[InitialPosition]] + */ + def kinesisInitialPositionInStream( --- End diff -- Is this method really necessary? Especially if it can only be used for a subset of the official ```InitialPositionInStream``` implementations? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134068186 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + var initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + def instance: InitialPosition = this --- End diff -- Looks like it is for Java compatibility --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134067537 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + var initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + def instance: InitialPosition = this + override var initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * Returns instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + def latest() : InitialPosition = { +Latest + } + + /** + * Returns instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + def trimHorizon() : InitialPosition = { --- End diff -- Change to ```val``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org