[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155212619 **[Test build #45421 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45421/consoleFull)** for PR 9421 at commit [`63d2e52`](https://github.com/apache/spark/commit/63d2e5284034c899b5ae0ccdb6e3e1ac5c022f1c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155214151 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155209777 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155156521 One high level question: in the previous codes, checkpointings for multiple shards run in parallel. But now they run in sequences. Does it matter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44330530 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import java.util.concurrent.{TimeoutException, ExecutorService} + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach} +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.Eventually._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase} +import org.apache.spark.util.ManualClock + +class KinesisCheckpointerSuite extends TestSuiteBase + with MockitoSugar + with BeforeAndAfterEach + with PrivateMethodTester + with Eventually { + + private val workerId = "dummyWorkerId" + private val shardId = "dummyShardId" + private val seqNum = "123" + private val otherSeqNum = "245" + private val checkpointInterval = Duration(10) + private val someSeqNum = Some(seqNum) + private val someOtherSeqNum = Some(otherSeqNum) + + private var receiverMock: KinesisReceiver[Array[Byte]] = _ + private var checkpointerMock: IRecordProcessorCheckpointer = _ + private var kinesisCheckpointer: KinesisCheckpointer = _ + private var clock: ManualClock = _ + + private val checkpoint = PrivateMethod[Unit]('checkpoint) + + override def beforeEach(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] +clock = new ManualClock() +kinesisCheckpointer = new KinesisCheckpointer(receiverMock, checkpointInterval, workerId, clock) + } + + test("checkpoint is not called twice for the same sequence number") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is called after sequence number increases") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(seqNum) +verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) + } + + test("should checkpoint if we have exceeded the checkpoint interval") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) + +kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) +clock.advance(5 * checkpointInterval.milliseconds) + +eventually(timeout(1 second)) { + verify(checkpointerMock, times(1)).checkpoint(seqNum) + verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) +} + } + + test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) + +kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) +clock.advance(checkpointInterval.milliseconds / 2) + +
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155191287 LGTM except a suggestion for the flaky test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user brkyvz commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155167933 @zsxwing It doesn't matter. Since we always checkpoint the latest value we can checkpoint, it doesn't matter if it is sequential or in parallel --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155171992 > @zsxwing It doesn't matter. Since we always checkpoint the latest value we can checkpoint, it doesn't matter if it is sequential or in parallel I didn't mean the correctness. I was wondering if the performance lost is acceptable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user brkyvz commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155175426 @zsxwing It should be acceptable as well. Think about it like this: We have 2 receivers, A and B: t0 -> A receives batch with seq number x_0, B receives batch with seq number y_0 t1 -> A receives batch with seq number x_1 t2 -> B receives batch with seq number y_1 In the parallel case, if we were to checkpoint at t1 + epsilon, for A we would checkpoint x_1, and for B, we would checkpoint y_0. In the sequential case, assume we checkpoint at t1 + epsilon for A, and t2 + epsilon for B, since there is some time for the checkpoint, then we would checkpoint x_1 for A, and y_1 for B. Performance wise it shouldn't be a problem, we do a "Best effort attempt" in checkpointing to DynamoDB. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44324452 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import java.util.concurrent.{TimeoutException, ExecutorService} + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach} +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.Eventually._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase} +import org.apache.spark.util.ManualClock + +class KinesisCheckpointerSuite extends TestSuiteBase + with MockitoSugar + with BeforeAndAfterEach + with PrivateMethodTester + with Eventually { + + private val workerId = "dummyWorkerId" + private val shardId = "dummyShardId" + private val seqNum = "123" + private val otherSeqNum = "245" + private val checkpointInterval = Duration(10) + private val someSeqNum = Some(seqNum) + private val someOtherSeqNum = Some(otherSeqNum) + + private var receiverMock: KinesisReceiver[Array[Byte]] = _ + private var checkpointerMock: IRecordProcessorCheckpointer = _ + private var kinesisCheckpointer: KinesisCheckpointer = _ + private var clock: ManualClock = _ + + private val checkpoint = PrivateMethod[Unit]('checkpoint) + + override def beforeEach(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] +clock = new ManualClock() +kinesisCheckpointer = new KinesisCheckpointer(receiverMock, checkpointInterval, workerId, clock) + } + + test("checkpoint is not called twice for the same sequence number") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is called after sequence number increases") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(seqNum) +verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) + } + + test("should checkpoint if we have exceeded the checkpoint interval") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) + +kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) +clock.advance(5 * checkpointInterval.milliseconds) + +eventually(timeout(1 second)) { + verify(checkpointerMock, times(1)).checkpoint(seqNum) + verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) +} + } + + test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) + +kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) +clock.advance(checkpointInterval.milliseconds / 2) --- End diff --
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44324123 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import java.util.concurrent.{TimeoutException, ExecutorService} + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach} +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.Eventually._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase} +import org.apache.spark.util.ManualClock + +class KinesisCheckpointerSuite extends TestSuiteBase + with MockitoSugar + with BeforeAndAfterEach + with PrivateMethodTester + with Eventually { + + private val workerId = "dummyWorkerId" + private val shardId = "dummyShardId" + private val seqNum = "123" + private val otherSeqNum = "245" + private val checkpointInterval = Duration(10) + private val someSeqNum = Some(seqNum) + private val someOtherSeqNum = Some(otherSeqNum) + + private var receiverMock: KinesisReceiver[Array[Byte]] = _ + private var checkpointerMock: IRecordProcessorCheckpointer = _ + private var kinesisCheckpointer: KinesisCheckpointer = _ + private var clock: ManualClock = _ + + private val checkpoint = PrivateMethod[Unit]('checkpoint) + + override def beforeEach(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] +clock = new ManualClock() +kinesisCheckpointer = new KinesisCheckpointer(receiverMock, checkpointInterval, workerId, clock) + } + + test("checkpoint is not called twice for the same sequence number") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is called after sequence number increases") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) +kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(seqNum) +verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) + } + + test("should checkpoint if we have exceeded the checkpoint interval") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) + +kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) +clock.advance(5 * checkpointInterval.milliseconds) + +eventually(timeout(1 second)) { + verify(checkpointerMock, times(1)).checkpoint(seqNum) + verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) +} + } + + test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) + +kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) +clock.advance(checkpointInterval.milliseconds / 2) --- End diff --
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155221046 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155220893 **[Test build #45421 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45421/consoleFull)** for PR 9421 at commit [`63d2e52`](https://github.com/apache/spark/commit/63d2e5284034c899b5ae0ccdb6e3e1ac5c022f1c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155209804 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-155221200 Perfect. Merging this to master and branch 1.6. Thanks @brkyvz for all the effort, and @zsxwing for reviewing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/9421 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44235736 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala --- @@ -135,18 +105,17 @@ private[kinesis] class KinesisRecordProcessor[T]( */ override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) { logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") -reason match { +// We want to return a checkpointer based on the shutdown cause. If we want to terminate, +// then we want to use the given checkpointer to checkpoint one last time. In other cases, +// we return a `null` so that we don't checkpoint one last time. +val cp: IRecordProcessorCheckpointer = reason match { --- End diff -- Can this code be just simplified to ``` reason match { case ShutdownReason.TERMINATE => receiver.removeCheckpointer(shardId, cp) case _ => receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44235617 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.util.ThreadUtils + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], --- End diff -- This reasoning in the comment is not relevant any more. Please update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44235599 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.util.RecurringTimer +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param clock In order to use ManualClocks for the purpose of testing + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String, +clock: Clock = new SystemClock) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread: RecurringTimer = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], + * we will use that to make the final checkpoint. If `null` is provided, we will not make the + * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]]. + */ + def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = --- End diff -- nit: Add braces around the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154901716 Overall its looks good. Just a few minor comments. @zsxwing Does this look good to you? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44235801 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -217,6 +226,29 @@ private[kinesis] class KinesisReceiver[T]( } /** + * Set the checkpointer that will be used to checkpoint sequence numbers to DynamoDB for the + * given shardId. + */ + private[kinesis] def setCheckpointer( + shardId: String, + checkpointer: IRecordProcessorCheckpointer): Unit = { +assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!") +kinesisCheckpointer.setCheckpointer(shardId, checkpointer) + } + + /** + * Remove the checkpointer for the given shardId. The provided checkpointer will be used to + * checkpoint one last time for the given shard. If `checkpointer` is `null`, then we will not + * checkpoint. + */ + private[kinesis] def removeCheckpointer( --- End diff -- Same issue as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44235797 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -217,6 +226,29 @@ private[kinesis] class KinesisReceiver[T]( } /** + * Set the checkpointer that will be used to checkpoint sequence numbers to DynamoDB for the + * given shardId. + */ + private[kinesis] def setCheckpointer( --- End diff -- The whole class is `private[kinesis]`. Its not a big deal really, but it causes a bit of confusion while reading the code, as if this the whole class is public. Please be aware of such issues elsewhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154931538 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154931548 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154944163 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154943927 **[Test build #45344 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45344/consoleFull)** for PR 9421 at commit [`f29814c`](https://github.com/apache/spark/commit/f29814c2e65ae434a7ef20977d2401845d5a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154932101 **[Test build #45344 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45344/consoleFull)** for PR 9421 at commit [`f29814c`](https://github.com/apache/spark/commit/f29814c2e65ae434a7ef20977d2401845d5a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154513836 Build started sha1 is merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154522193 **[Test build #45241 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45241/consoleFull)** for PR 9421 at commit [`36ac43e`](https://github.com/apache/spark/commit/36ac43e5c4f06a4a8e173af7b1e8e1038d214323). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154522342 Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45241/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154522341 Build finished. 168 tests run, 0 skipped, 0 failed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154167783 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154167741 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154173376 **[Test build #45139 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45139/consoleFull)** for PR 9421 at commit [`457c6c6`](https://github.com/apache/spark/commit/457c6c689a5b309137f7a91f4466c4bac19e19f0). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154168797 **[Test build #45137 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45137/consoleFull)** for PR 9421 at commit [`95e366c`](https://github.com/apache/spark/commit/95e366c5f5284ff39075bb9c530cf0f8f7180c8a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154168895 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154168710 **[Test build #45134 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45134/consoleFull)** for PR 9421 at commit [`e0cabf7`](https://github.com/apache/spark/commit/e0cabf7572fdd293a45ce765befde711ad5dea6e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154172998 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154173038 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154165611 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154165535 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154168885 **[Test build #45134 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45134/consoleFull)** for PR 9421 at commit [`e0cabf7`](https://github.com/apache/spark/commit/e0cabf7572fdd293a45ce765befde711ad5dea6e). * This patch **fails RAT tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154168946 **[Test build #45137 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45137/consoleFull)** for PR 9421 at commit [`95e366c`](https://github.com/apache/spark/commit/95e366c5f5284ff39075bb9c530cf0f8f7180c8a). * This patch **fails RAT tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154168897 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45134/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154168952 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45137/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154168950 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44088617 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds} + +class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar with BeforeAndAfter { + + val app = "TestKinesisReceiver" + val stream = "mySparkStream" + val endpoint = "endpoint-url" + val workerId = "dummyWorkerId" + val shardId = "dummyShardId" + val seqNum = "123" + val otherSeqNum = "245" + val checkpointInterval = Duration(10) + val someSeqNum = Some(seqNum) + val someOtherSeqNum = Some(otherSeqNum) + + var receiverMock: KinesisReceiver[Array[Byte]] = _ + var checkpointerMock: IRecordProcessorCheckpointer = _ + + override def beforeFunction(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] + } + + test("checkpoint is not called for None's and nulls") { --- End diff -- Why even have options in the signature such that you have to test for it??? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154241512 **[Test build #45174 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45174/consoleFull)** for PR 9421 at commit [`88a67bc`](https://github.com/apache/spark/commit/88a67bc1cecee6ebfba0f00774ba64308b266eef). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154240332 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154246737 Overall there are a few majors suggestions. - To test for the timer stuff in `KinesisCheckpointer`, its best to refactor `KinesisCheckpointer` to use `streaming.util.RecurringTimer` instead of scheduled executor service, so that it can be tested with manual clock. - Some refactoring of the KinesisCheckpointer internal logic to remove unnecessary options, and to simplify shutdown logic. - Some minor test refactoring. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44082707 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.util.ThreadUtils + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], --- End diff -- Why do we need to do two different things. Unlike the usual checkpointing by usual IRecordProcessor implementations which checkpoint as soon as the data is received, we checkpoint stuff that has been received AND stored in Spark reliably. If some data has already been stored, then isnt it just strictly better to write corresponding offset to DynamoDB in any condition? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44083492 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.util.ThreadUtils + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], + * we will use that to make the final checkpoint. If `null` is provided, we will not make the + * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]]. + */ + def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpoint(shardId, Option(checkpointer)) +checkpointers.remove(shardId) + } + + /** Perform the checkpoint. Exposed for tests. */ + private[kinesis] def checkpoint( + shardId: String, + checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { +// if this method throws an exception, then the scheduled task will not run again +try { + checkpointer.foreach { cp => +receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum => + val lastSeqNum = lastCheckpointedSeqNums.get(shardId) + // Kinesis sequence numbers are monotonically increasing strings, therefore we can do + // safely do the string comparison + if (lastSeqNum == null || latestSeqNum > lastSeqNum) { +/* Perform the checkpoint */ + KinesisRecordProcessor.retryRandom(cp.checkpoint(latestSeqNum), 4, 100) +logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint at sequence number" + + s" $latestSeqNum for shardId $shardId") +lastCheckpointedSeqNums.put(shardId, latestSeqNum) + } +} + } +} catch { + case NonFatal(e) => +logError("Failed to checkpoint to DynamoDB.", e) --- End diff -- This should be a logWarn. Its not a serious error as future attempts may succeed. Just like task failures are logged as warnings as they are going to reattempted, and only job failures are logged as errors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44086804 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -216,6 +226,25 @@ private[kinesis] class KinesisReceiver[T]( shardIdToLatestStoredSeqNum.get(shardId) } + /** Pass the setCheckpointer request to the checkpointer. */ + private[kinesis] def setCheckpointerForShardId( --- End diff -- Since I have the `getCheckpointer` method, I didn't want them to feel the same way. `getCheckpointer` returns the central `checkpointer` but `setCheckpointer` is for a specific shardId. Don't you think that would be confusing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154240335 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45173/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154240258 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154240324 **[Test build #45173 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45173/consoleFull)** for PR 9421 at commit [`79f47ec`](https://github.com/apache/spark/commit/79f47ec116bd8c17a7ab7266c90c186ec4afaff8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154240240 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154241059 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44090030 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.util.ThreadUtils + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], + * we will use that to make the final checkpoint. If `null` is provided, we will not make the + * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]]. + */ + def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpoint(shardId, Option(checkpointer)) +checkpointers.remove(shardId) + } + + /** Perform the checkpoint. Exposed for tests. */ + private[kinesis] def checkpoint( + shardId: String, + checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { --- End diff -- But for that a cleaner design is to update the checkpointer in the map, and then call `checkpoint(shardId)`. Its confusing code to see that Also, why have it as a `Option` and later test for cases where it is None? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44083002 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.util.ThreadUtils + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], + * we will use that to make the final checkpoint. If `null` is provided, we will not make the + * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]]. + */ + def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpoint(shardId, Option(checkpointer)) +checkpointers.remove(shardId) + } + + /** Perform the checkpoint. Exposed for tests. */ + private[kinesis] def checkpoint( + shardId: String, + checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { --- End diff -- Why pass the checkpointer if the `lastCheckpointedSeqNums` already has it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44086994 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.util.ThreadUtils + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], + * we will use that to make the final checkpoint. If `null` is provided, we will not make the + * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]]. + */ + def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpoint(shardId, Option(checkpointer)) +checkpointers.remove(shardId) + } + + /** Perform the checkpoint. Exposed for tests. */ + private[kinesis] def checkpoint( + shardId: String, + checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { --- End diff -- You mean `checkpointers`? I want `checkpoint` to be called during the shutdown of a record processor called as well, independent from the checkpoint time. Otherwise we have to keep track of, Should I invalidate this checkpointer for shardId --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44089550 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds} + +class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar with BeforeAndAfter { + + val app = "TestKinesisReceiver" + val stream = "mySparkStream" + val endpoint = "endpoint-url" + val workerId = "dummyWorkerId" + val shardId = "dummyShardId" + val seqNum = "123" + val otherSeqNum = "245" + val checkpointInterval = Duration(10) + val someSeqNum = Some(seqNum) + val someOtherSeqNum = Some(otherSeqNum) + + var receiverMock: KinesisReceiver[Array[Byte]] = _ + var checkpointerMock: IRecordProcessorCheckpointer = _ + + override def beforeFunction(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] + } + + test("checkpoint is not called for None's and nulls") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) +checkpointState.checkpoint(shardId, None) +checkpointState.checkpoint(shardId, Option(null)) +// the above two calls should be No-Ops +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is not called twice for the same sequence number") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is called after sequence number increases") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(seqNum) +verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) + } + + test("should checkpoint if we have exceeded the checkpoint interval") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.setCheckpointer(shardId, checkpointerMock) +Thread.sleep(checkpointInterval.milliseconds * 5) +verify(checkpointerMock, times(1)).checkpoint(seqNum) +verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) +checkpointState.shutdown() + } + + test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock,
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44090270 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds} + +class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar with BeforeAndAfter { + + private val app = "TestKinesisReceiver" + private val stream = "mySparkStream" + private val endpoint = "endpoint-url" + private val workerId = "dummyWorkerId" + private val shardId = "dummyShardId" + private val seqNum = "123" + private val otherSeqNum = "245" + private val checkpointInterval = Duration(10) + private val someSeqNum = Some(seqNum) + private val someOtherSeqNum = Some(otherSeqNum) + + private var receiverMock: KinesisReceiver[Array[Byte]] = _ + private var checkpointerMock: IRecordProcessorCheckpointer = _ + + override def beforeFunction(): Unit = { --- End diff -- Nit: dont we generally use `before { }` ``` [tdas @ Zion spark2] git grep "beforeFunction" | wc -l 5 [tdas @ Zion spark2] git grep "before {" | wc -l 44 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154245330 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44083518 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.util.ThreadUtils + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], + * we will use that to make the final checkpoint. If `null` is provided, we will not make the + * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]]. + */ + def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpoint(shardId, Option(checkpointer)) +checkpointers.remove(shardId) + } + + /** Perform the checkpoint. Exposed for tests. */ + private[kinesis] def checkpoint( + shardId: String, + checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { +// if this method throws an exception, then the scheduled task will not run again +try { + checkpointer.foreach { cp => +receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum => + val lastSeqNum = lastCheckpointedSeqNums.get(shardId) + // Kinesis sequence numbers are monotonically increasing strings, therefore we can do + // safely do the string comparison + if (lastSeqNum == null || latestSeqNum > lastSeqNum) { +/* Perform the checkpoint */ + KinesisRecordProcessor.retryRandom(cp.checkpoint(latestSeqNum), 4, 100) +logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint at sequence number" + + s" $latestSeqNum for shardId $shardId") +lastCheckpointedSeqNums.put(shardId, latestSeqNum) + } +} + } +} catch { + case NonFatal(e) => +logError("Failed to checkpoint to DynamoDB.", e) +} + } + + /** + * Start the checkpointer thread with the given checkpoint duration. Exposed for tests. + */ + private[kinesis] def startCheckpointerThread(): ScheduledFuture[_] = { +val period = checkpointInterval.milliseconds +val ex = + ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"Kinesis Checkpointer - Worker $workerId") +val task = new Runnable { + def run() = { +val shardIds =
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44083524 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.util.ThreadUtils + +/** + * This is a helper class for managing Kinesis checkpointing. + * + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + */ +private[kinesis] class KinesisCheckpointer( +receiver: KinesisReceiver[_], +checkpointInterval: Duration, +workerId: String) extends Logging { + + // a map from shardId's to checkpointers + private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() + + private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() + + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one for the given shardId. */ + def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { +checkpointers.put(shardId, checkpointer) + } + + /** + * Stop tracking the specified shardId. + * + * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], --- End diff -- I'm worried that the IRecordProcessorCheckpointers may correspond to the wrong instances during, after resharding. Therefore I would like to use the latest possible checkpointer as much as possible --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44088418 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds} + +class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar with BeforeAndAfter { + + val app = "TestKinesisReceiver" + val stream = "mySparkStream" + val endpoint = "endpoint-url" + val workerId = "dummyWorkerId" + val shardId = "dummyShardId" + val seqNum = "123" + val otherSeqNum = "245" + val checkpointInterval = Duration(10) + val someSeqNum = Some(seqNum) + val someOtherSeqNum = Some(otherSeqNum) + + var receiverMock: KinesisReceiver[Array[Byte]] = _ + var checkpointerMock: IRecordProcessorCheckpointer = _ + + override def beforeFunction(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] + } + + test("checkpoint is not called for None's and nulls") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = --- End diff -- Please rename variables correctly, its not checkpointState any more. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154241070 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44089287 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds} + +class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar with BeforeAndAfter { + + val app = "TestKinesisReceiver" + val stream = "mySparkStream" + val endpoint = "endpoint-url" + val workerId = "dummyWorkerId" + val shardId = "dummyShardId" + val seqNum = "123" + val otherSeqNum = "245" + val checkpointInterval = Duration(10) + val someSeqNum = Some(seqNum) + val someOtherSeqNum = Some(otherSeqNum) + + var receiverMock: KinesisReceiver[Array[Byte]] = _ + var checkpointerMock: IRecordProcessorCheckpointer = _ + + override def beforeFunction(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] + } + + test("checkpoint is not called for None's and nulls") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) +checkpointState.checkpoint(shardId, None) +checkpointState.checkpoint(shardId, Option(null)) +// the above two calls should be No-Ops +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is not called twice for the same sequence number") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is called after sequence number increases") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(seqNum) +verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) + } + + test("should checkpoint if we have exceeded the checkpoint interval") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.setCheckpointer(shardId, checkpointerMock) +Thread.sleep(checkpointInterval.milliseconds * 5) --- End diff -- Do not use thread.sleep. causes a lot of flakiness, and we have been actively removing sleep from everywhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44089562 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds} + +class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar with BeforeAndAfter { + + val app = "TestKinesisReceiver" + val stream = "mySparkStream" + val endpoint = "endpoint-url" + val workerId = "dummyWorkerId" + val shardId = "dummyShardId" + val seqNum = "123" + val otherSeqNum = "245" + val checkpointInterval = Duration(10) + val someSeqNum = Some(seqNum) + val someOtherSeqNum = Some(otherSeqNum) + + var receiverMock: KinesisReceiver[Array[Byte]] = _ + var checkpointerMock: IRecordProcessorCheckpointer = _ + + override def beforeFunction(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] + } + + test("checkpoint is not called for None's and nulls") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) +checkpointState.checkpoint(shardId, None) +checkpointState.checkpoint(shardId, Option(null)) +// the above two calls should be No-Ops +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is not called twice for the same sequence number") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is called after sequence number increases") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(seqNum) +verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) + } + + test("should checkpoint if we have exceeded the checkpoint interval") { +when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) + .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.setCheckpointer(shardId, checkpointerMock) +Thread.sleep(checkpointInterval.milliseconds * 5) +verify(checkpointerMock, times(1)).checkpoint(seqNum) +verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) +checkpointState.shutdown() + } + + test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock,
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44090207 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds} + +class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar with BeforeAndAfter { + + private val app = "TestKinesisReceiver" + private val stream = "mySparkStream" + private val endpoint = "endpoint-url" + private val workerId = "dummyWorkerId" + private val shardId = "dummyShardId" + private val seqNum = "123" + private val otherSeqNum = "245" + private val checkpointInterval = Duration(10) + private val someSeqNum = Some(seqNum) + private val someOtherSeqNum = Some(otherSeqNum) + + private var receiverMock: KinesisReceiver[Array[Byte]] = _ + private var checkpointerMock: IRecordProcessorCheckpointer = _ + + override def beforeFunction(): Unit = { +receiverMock = mock[KinesisReceiver[Array[Byte]]] +checkpointerMock = mock[IRecordProcessorCheckpointer] + } + + test("checkpoint is not called for None's and nulls") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = + new KinesisCheckpointer(receiverMock, checkpointInterval, workerId) +checkpointState.checkpoint(shardId, Option(checkpointerMock)) + +verify(checkpointerMock, times(1)).checkpoint(anyString()) +checkpointState.checkpoint(shardId, None) +checkpointState.checkpoint(shardId, Option(null)) +// the above two calls should be No-Ops +verify(checkpointerMock, times(1)).checkpoint(anyString()) + } + + test("checkpoint is not called twice for the same sequence number") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) +val checkpointState = --- End diff -- Cant some of this duplicate code move into the before? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154245314 **[Test build #45174 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45174/consoleFull)** for PR 9421 at commit [`88a67bc`](https://github.com/apache/spark/commit/88a67bc1cecee6ebfba0f00774ba64308b266eef). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154245332 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45174/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44067306 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -216,6 +226,25 @@ private[kinesis] class KinesisReceiver[T]( shardIdToLatestStoredSeqNum.get(shardId) } + /** Pass the setCheckpointer request to the checkpointer. */ + private[kinesis] def setCheckpointerForShardId( --- End diff -- setCheckpointer is sufficient. Scala docs should just say "set the checkpoint that will be used to checkpoint sequence numbers to DynamoDB". Whether there is a "setCheckpointer method or not on some other checkpointer - these are implementation details. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154188753 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154188755 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45139/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44069549 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala --- @@ -135,18 +105,17 @@ private[kinesis] class KinesisRecordProcessor[T]( */ override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) { logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") -reason match { +// We want to return a checkpointer based on the shutdown cause. If we want to terminate, --- End diff -- We do remove. But we checkpoint one last time using the most recent checkpointer before removing it from the map. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44065511 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds} + +class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar with BeforeAndAfter { + + val app = "TestKinesisReceiver" --- End diff -- Can you make all of them private? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44065382 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala --- @@ -135,18 +105,17 @@ private[kinesis] class KinesisRecordProcessor[T]( */ override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) { logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") -reason match { +// We want to return a checkpointer based on the shutdown cause. If we want to terminate, --- End diff -- I dont get this. Dont we always want remove the checkpointer when shutdown() is called irrespective of the reason of shutdown? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44077155 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala --- @@ -27,26 +27,23 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record import org.apache.spark.Logging +import org.apache.spark.streaming.Duration /** * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. * This implementation operates on the Array[Byte] from the KinesisReceiver. * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each - * shard in the Kinesis stream upon startup. This is normally done in separate threads, - * but the KCLs within the KinesisReceivers will balance themselves out if you create - * multiple Receivers. + * shard in the Kinesis stream upon startup. This is normally done in separate threads, + * but the KCLs within the KinesisReceivers will balance themselves out if you create + * multiple Receivers. * * @param receiver Kinesis receiver * @param workerId for logging purposes - * @param checkpointState represents the checkpoint state including the next checkpoint time. - * It's injected here for mocking purposes. */ -private[kinesis] class KinesisRecordProcessor[T]( -receiver: KinesisReceiver[T], -workerId: String, -checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging { +private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], workerId: String) + extends IRecordProcessor with Logging { - // shardId to be populated during initialize() + // shardId and checkpointState to be populated during initialize() --- End diff -- this is irrelevant now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-154188680 **[Test build #45139 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45139/consoleFull)** for PR 9421 at commit [`457c6c6`](https://github.com/apache/spark/commit/457c6c689a5b309137f7a91f4466c4bac19e19f0). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r44076396 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -216,6 +226,25 @@ private[kinesis] class KinesisReceiver[T]( shardIdToLatestStoredSeqNum.get(shardId) } + /** Pass the setCheckpointer request to the checkpointer. */ + private[kinesis] def setCheckpointerForShardId( + shardId: String, + checkpointer: IRecordProcessorCheckpointer): Unit = { +assert(getCheckpointer() != null, "Kinesis Checkpointer not initialized!") +getCheckpointer().setCheckpointer(shardId, checkpointer) + } + + /** Pass the removeCheckpointer request to the checkpointer. */ --- End diff -- Same comment as above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43967600 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -16,39 +16,77 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** - * This is a helper class for managing checkpoint clocks. + * This is a helper class for managing Kinesis checkpointing. * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param shardId The shard this worker was consuming data from */ -private[kinesis] class KinesisCheckpointState( +private[kinesis] class KinesisCheckpointState[T]( +receiver: KinesisReceiver[T], checkpointInterval: Duration, -currentClock: Clock = new SystemClock()) - extends Logging { +workerId: String, +shardId: String) extends Logging { - /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */ - val checkpointClock = new ManualClock() - checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds) + private var _checkpointer: Option[IRecordProcessorCheckpointer] = None - /** - * Check if it's time to checkpoint based on the current time and the derived time - * for the next checkpoint - * - * @return true if it's time to checkpoint - */ - def shouldCheckpoint(): Boolean = { -new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis() + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one. */ + def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = { +_checkpointer = Option(checkpointer) + } + + /** Perform the checkpoint */ + private def checkpoint(checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { +// if this method throws an exception, then the scheduled task will not run again +try { + checkpointer.foreach { cp => --- End diff -- It should be okay --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43857597 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -16,39 +16,77 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** - * This is a helper class for managing checkpoint clocks. + * This is a helper class for managing Kinesis checkpointing. * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param shardId The shard this worker was consuming data from */ -private[kinesis] class KinesisCheckpointState( +private[kinesis] class KinesisCheckpointState[T]( +receiver: KinesisReceiver[T], --- End diff -- You need T here. You can take reference to the receiver as KinesisReceiver[_] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43857672 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -16,39 +16,77 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** - * This is a helper class for managing checkpoint clocks. + * This is a helper class for managing Kinesis checkpointing. * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param shardId The shard this worker was consuming data from */ -private[kinesis] class KinesisCheckpointState( +private[kinesis] class KinesisCheckpointState[T]( +receiver: KinesisReceiver[T], --- End diff -- Also could rename this to KinesisCheckpointer, to better reflect what this is doing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43857784 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala --- @@ -57,6 +58,7 @@ private[kinesis] class KinesisRecordProcessor[T]( */ override def initialize(shardId: String) { this.shardId = shardId +checkpointState = new KinesisCheckpointState[T](receiver, checkpointInterval, workerId, shardId) --- End diff -- Dont need to pass on T --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43857174 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -16,39 +16,77 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** - * This is a helper class for managing checkpoint clocks. + * This is a helper class for managing Kinesis checkpointing. * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param shardId The shard this worker was consuming data from */ -private[kinesis] class KinesisCheckpointState( +private[kinesis] class KinesisCheckpointState[T]( +receiver: KinesisReceiver[T], checkpointInterval: Duration, -currentClock: Clock = new SystemClock()) - extends Logging { +workerId: String, +shardId: String) extends Logging { - /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */ - val checkpointClock = new ManualClock() - checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds) + private var _checkpointer: Option[IRecordProcessorCheckpointer] = None - /** - * Check if it's time to checkpoint based on the current time and the derived time - * for the next checkpoint - * - * @return true if it's time to checkpoint - */ - def shouldCheckpoint(): Boolean = { -new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis() + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one. */ + def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = { +_checkpointer = Option(checkpointer) + } + + /** Perform the checkpoint */ + private def checkpoint(checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { +// if this method throws an exception, then the scheduled task will not run again +try { + checkpointer.foreach { cp => +receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum => + /* Perform the checkpoint */ + KinesisRecordProcessor.retryRandom(cp.checkpoint(latestSeqNum), 4, 100) + --- End diff -- nit: extra line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43859706 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -16,39 +16,77 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** - * This is a helper class for managing checkpoint clocks. + * This is a helper class for managing Kinesis checkpointing. * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param shardId The shard this worker was consuming data from */ -private[kinesis] class KinesisCheckpointState( +private[kinesis] class KinesisCheckpointState[T]( +receiver: KinesisReceiver[T], checkpointInterval: Duration, -currentClock: Clock = new SystemClock()) - extends Logging { +workerId: String, +shardId: String) extends Logging { - /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */ - val checkpointClock = new ManualClock() - checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds) + private var _checkpointer: Option[IRecordProcessorCheckpointer] = None - /** - * Check if it's time to checkpoint based on the current time and the derived time - * for the next checkpoint - * - * @return true if it's time to checkpoint - */ - def shouldCheckpoint(): Boolean = { -new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis() + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one. */ + def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = { +_checkpointer = Option(checkpointer) + } + + /** Perform the checkpoint */ + private def checkpoint(checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { --- End diff -- Why pass the checkpoint as an option when the class already has reference to it in `_checkpointer`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-153673079 I there is a general problem that bugs me. I am not sure whether the KCL guarantees that the IRecordProcessor.shutdown() will be called when the worker is shutdown. From the source code of the KCL it seems that the shutdown will only be called when the corresponding processor loses the shard lease or the shard is terminated due to resharding. So we cannot guarantee the shutdown of the KinesisCheckpointState will be called when the receiver stops. For that to happen, the design needs to change. There should a central single `KinesisCheckpointer` in the `KinesisReceiver` that is responsible for checkpointing all the shards.All the `KinesisRecordProcessor`s register and deregister their `IRecordProcessorCheckpointer` to the `KinesisCheckpointer` for each `shardId`. For each shardId, the `KinesisCheckpointer` will periodically try to call `KinesisRecordProcessor.checkpoint()` with the shardId in `shardIdTolatestStoreSeqNumber`. As an optimization, it will keep track of the last saved sequence number for each shardId and call checkpoint only if it has changed. Finally, when the receiver is stopped, the `KinesisCheckpointer.shutdown` will do a final round of checkpointing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43817473 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -16,39 +16,77 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** - * This is a helper class for managing checkpoint clocks. + * This is a helper class for managing Kinesis checkpointing. * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param shardId The shard this worker was consuming data from */ -private[kinesis] class KinesisCheckpointState( +private[kinesis] class KinesisCheckpointState[T]( +receiver: KinesisReceiver[T], checkpointInterval: Duration, -currentClock: Clock = new SystemClock()) - extends Logging { +workerId: String, +shardId: String) extends Logging { - /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */ - val checkpointClock = new ManualClock() - checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds) + private var _checkpointer: Option[IRecordProcessorCheckpointer] = None - /** - * Check if it's time to checkpoint based on the current time and the derived time - * for the next checkpoint - * - * @return true if it's time to checkpoint - */ - def shouldCheckpoint(): Boolean = { -new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis() + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one. */ + def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = { +_checkpointer = Option(checkpointer) + } + + /** Perform the checkpoint */ + private def checkpoint(checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { +// if this method throws an exception, then the scheduled task will not run again +try { + checkpointer.foreach { cp => --- End diff -- What will happen if we use an old `IRecordProcessorCheckpointer` to checkpoint? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43815388 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -16,39 +16,77 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** - * This is a helper class for managing checkpoint clocks. + * This is a helper class for managing Kinesis checkpointing. * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param shardId The shard this worker was consuming data from */ -private[kinesis] class KinesisCheckpointState( +private[kinesis] class KinesisCheckpointState[T]( +receiver: KinesisReceiver[T], checkpointInterval: Duration, -currentClock: Clock = new SystemClock()) - extends Logging { +workerId: String, +shardId: String) extends Logging { - /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */ - val checkpointClock = new ManualClock() - checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds) + private var _checkpointer: Option[IRecordProcessorCheckpointer] = None - /** - * Check if it's time to checkpoint based on the current time and the derived time - * for the next checkpoint - * - * @return true if it's time to checkpoint - */ - def shouldCheckpoint(): Boolean = { -new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis() + private val checkpointerThread = startCheckpointerThread() + + /** Update the checkpointer instance to the most recent one. */ + def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = { +_checkpointer = Option(checkpointer) + } + + /** Perform the checkpoint */ + private def checkpoint(checkpointer: Option[IRecordProcessorCheckpointer]): Unit = { +// if this method throws an exception, then the scheduled task will not run again +try { + checkpointer.foreach { cp => +receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum => + /* Perform the checkpoint */ + KinesisRecordProcessor.retryRandom(cp.checkpoint(latestSeqNum), 4, 100) + + logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint at sequence number" + +s" $latestSeqNum for shardId $shardId") +} + } +} catch { + case NonFatal(e) => +logError("Failed to checkpoint to DynamoDB.", e) +} + } + + /** Start the checkpointer thread with the given checkpoint duration. */ + private def startCheckpointerThread(): ScheduledFuture[_] = { +val period = checkpointInterval.milliseconds +val ex = new ScheduledThreadPoolExecutor(1) --- End diff -- This needs to be shutdown. Otherwise, it will leak a thread. BTW, it's better to add name and use daemon threads. You can use `ThreadUtils.newDaemonSingleThreadScheduledExecutor` to create a named one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/9421#discussion_r43817494 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -16,39 +16,77 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent._ + +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason + import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** - * This is a helper class for managing checkpoint clocks. + * This is a helper class for managing Kinesis checkpointing. * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint + * @param checkpointInterval How frequently we will checkpoint to DynamoDB + * @param workerId Worker Id of KCL worker for logging purposes + * @param shardId The shard this worker was consuming data from */ -private[kinesis] class KinesisCheckpointState( +private[kinesis] class KinesisCheckpointState[T]( +receiver: KinesisReceiver[T], checkpointInterval: Duration, -currentClock: Clock = new SystemClock()) - extends Logging { +workerId: String, +shardId: String) extends Logging { - /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */ - val checkpointClock = new ManualClock() - checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds) + private var _checkpointer: Option[IRecordProcessorCheckpointer] = None --- End diff -- This should be `volatile` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-153171076 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-153171035 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-153171831 **[Test build #44839 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44839/consoleFull)** for PR 9421 at commit [`feb69ba`](https://github.com/apache/spark/commit/feb69bacd568c8a8a8112d11981c626482d4af1a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/9421 [SPARK-11359][STREAMING][KINESIS] Checkpoint to DynamoDB even when new data doesn't come in Currently, the checkpoints to DynamoDB occur only when new data comes in, as we update the clock for the checkpointState. This PR makes the checkpoint a scheduled execution based on the `checkpointInterval`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark kinesis-checkpoint Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9421.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9421 commit feb69bacd568c8a8a8112d11981c626482d4af1a Author: Burak YavuzDate: 2015-11-02T21:56:39Z update Kinesis checkpointing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-153181727 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-153181593 **[Test build #44839 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44839/consoleFull)** for PR 9421 at commit [`feb69ba`](https://github.com/apache/spark/commit/feb69bacd568c8a8a8112d11981c626482d4af1a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/9421#issuecomment-153181731 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44839/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org