[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155212619
  
**[Test build #45421 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45421/consoleFull)**
 for PR 9421 at commit 
[`63d2e52`](https://github.com/apache/spark/commit/63d2e5284034c899b5ae0ccdb6e3e1ac5c022f1c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155214151
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155209777
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155156521
  
One high level question: in the previous codes, checkpointings for multiple 
shards run in parallel. But now they run in sequences. Does it matter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44330530
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent.{TimeoutException, ExecutorService}
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase}
+import org.apache.spark.util.ManualClock
+
+class KinesisCheckpointerSuite extends TestSuiteBase
+  with MockitoSugar
+  with BeforeAndAfterEach
+  with PrivateMethodTester
+  with Eventually {
+
+  private val workerId = "dummyWorkerId"
+  private val shardId = "dummyShardId"
+  private val seqNum = "123"
+  private val otherSeqNum = "245"
+  private val checkpointInterval = Duration(10)
+  private val someSeqNum = Some(seqNum)
+  private val someOtherSeqNum = Some(otherSeqNum)
+
+  private var receiverMock: KinesisReceiver[Array[Byte]] = _
+  private var checkpointerMock: IRecordProcessorCheckpointer = _
+  private var kinesisCheckpointer: KinesisCheckpointer = _
+  private var clock: ManualClock = _
+
+  private val checkpoint = PrivateMethod[Unit]('checkpoint)
+
+  override def beforeEach(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+clock = new ManualClock()
+kinesisCheckpointer = new KinesisCheckpointer(receiverMock, 
checkpointInterval, workerId, clock)
+  }
+
+  test("checkpoint is not called twice for the same sequence number") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is called after sequence number increases") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(seqNum)
+verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+  }
+
+  test("should checkpoint if we have exceeded the checkpoint interval") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+
+kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+clock.advance(5 * checkpointInterval.milliseconds)
+
+eventually(timeout(1 second)) {
+  verify(checkpointerMock, times(1)).checkpoint(seqNum)
+  verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+}
+  }
+
+  test("shouldn't checkpoint if we have not exceeded the checkpoint 
interval") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+clock.advance(checkpointInterval.milliseconds / 2)
+
+

[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155191287
  
LGTM except a suggestion for the flaky test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread brkyvz
Github user brkyvz commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155167933
  
@zsxwing It doesn't matter. Since we always checkpoint the latest value we 
can checkpoint, it doesn't matter if it is sequential or in parallel


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155171992
  
> @zsxwing It doesn't matter. Since we always checkpoint the latest value 
we can checkpoint, it doesn't matter if it is sequential or in parallel

I didn't mean the correctness. I was wondering if the performance lost is 
acceptable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread brkyvz
Github user brkyvz commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155175426
  
@zsxwing It should be acceptable as well. Think about it like this:
We have 2 receivers, A and B:
t0 -> A receives batch with seq number x_0, B receives batch with seq 
number y_0
t1 -> A receives batch with seq number x_1
t2 -> B receives batch with seq number y_1

In the parallel case, if we were to checkpoint at t1 + epsilon, for A we 
would checkpoint x_1, and for B, we would checkpoint y_0.

In the sequential case, assume we checkpoint at t1 + epsilon for A, and t2 
+ epsilon for B, since there is some time for the checkpoint, then we would 
checkpoint x_1 for A, and y_1 for B.

Performance wise it shouldn't be a problem, we do a "Best effort attempt" 
in checkpointing to DynamoDB.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44324452
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent.{TimeoutException, ExecutorService}
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase}
+import org.apache.spark.util.ManualClock
+
+class KinesisCheckpointerSuite extends TestSuiteBase
+  with MockitoSugar
+  with BeforeAndAfterEach
+  with PrivateMethodTester
+  with Eventually {
+
+  private val workerId = "dummyWorkerId"
+  private val shardId = "dummyShardId"
+  private val seqNum = "123"
+  private val otherSeqNum = "245"
+  private val checkpointInterval = Duration(10)
+  private val someSeqNum = Some(seqNum)
+  private val someOtherSeqNum = Some(otherSeqNum)
+
+  private var receiverMock: KinesisReceiver[Array[Byte]] = _
+  private var checkpointerMock: IRecordProcessorCheckpointer = _
+  private var kinesisCheckpointer: KinesisCheckpointer = _
+  private var clock: ManualClock = _
+
+  private val checkpoint = PrivateMethod[Unit]('checkpoint)
+
+  override def beforeEach(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+clock = new ManualClock()
+kinesisCheckpointer = new KinesisCheckpointer(receiverMock, 
checkpointInterval, workerId, clock)
+  }
+
+  test("checkpoint is not called twice for the same sequence number") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is called after sequence number increases") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(seqNum)
+verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+  }
+
+  test("should checkpoint if we have exceeded the checkpoint interval") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+
+kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+clock.advance(5 * checkpointInterval.milliseconds)
+
+eventually(timeout(1 second)) {
+  verify(checkpointerMock, times(1)).checkpoint(seqNum)
+  verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+}
+  }
+
+  test("shouldn't checkpoint if we have not exceeded the checkpoint 
interval") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+clock.advance(checkpointInterval.milliseconds / 2)
--- End diff --

 

[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44324123
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent.{TimeoutException, ExecutorService}
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase}
+import org.apache.spark.util.ManualClock
+
+class KinesisCheckpointerSuite extends TestSuiteBase
+  with MockitoSugar
+  with BeforeAndAfterEach
+  with PrivateMethodTester
+  with Eventually {
+
+  private val workerId = "dummyWorkerId"
+  private val shardId = "dummyShardId"
+  private val seqNum = "123"
+  private val otherSeqNum = "245"
+  private val checkpointInterval = Duration(10)
+  private val someSeqNum = Some(seqNum)
+  private val someOtherSeqNum = Some(otherSeqNum)
+
+  private var receiverMock: KinesisReceiver[Array[Byte]] = _
+  private var checkpointerMock: IRecordProcessorCheckpointer = _
+  private var kinesisCheckpointer: KinesisCheckpointer = _
+  private var clock: ManualClock = _
+
+  private val checkpoint = PrivateMethod[Unit]('checkpoint)
+
+  override def beforeEach(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+clock = new ManualClock()
+kinesisCheckpointer = new KinesisCheckpointer(receiverMock, 
checkpointInterval, workerId, clock)
+  }
+
+  test("checkpoint is not called twice for the same sequence number") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is called after sequence number increases") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+kinesisCheckpointer.invokePrivate(checkpoint(shardId, 
checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(seqNum)
+verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+  }
+
+  test("should checkpoint if we have exceeded the checkpoint interval") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+
+kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+clock.advance(5 * checkpointInterval.milliseconds)
+
+eventually(timeout(1 second)) {
+  verify(checkpointerMock, times(1)).checkpoint(seqNum)
+  verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+}
+  }
+
+  test("shouldn't checkpoint if we have not exceeded the checkpoint 
interval") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+clock.advance(checkpointInterval.milliseconds / 2)
--- End diff --

 

[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155221046
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155220893
  
**[Test build #45421 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45421/consoleFull)**
 for PR 9421 at commit 
[`63d2e52`](https://github.com/apache/spark/commit/63d2e5284034c899b5ae0ccdb6e3e1ac5c022f1c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155209804
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-155221200
  
Perfect. Merging this to master and branch 1.6. Thanks @brkyvz for all the 
effort, and @zsxwing for reviewing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/9421


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44235736
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -135,18 +105,17 @@ private[kinesis] class KinesisRecordProcessor[T](
*/
   override def shutdown(checkpointer: IRecordProcessorCheckpointer, 
reason: ShutdownReason) {
 logInfo(s"Shutdown:  Shutting down workerId $workerId with reason 
$reason")
-reason match {
+// We want to return a checkpointer based on the shutdown cause. If we 
want to terminate,
+// then we want to use the given checkpointer to checkpoint one last 
time. In other cases,
+// we return a `null` so that we don't checkpoint one last time.
+val cp: IRecordProcessorCheckpointer = reason match {
--- End diff --

Can this code be just simplified to 
```
reason match {
  case ShutdownReason.TERMINATE => receiver.removeCheckpointer(shardId, cp)
  case _ =>  receiver.removeCheckpointer(shardId, null)   // return null so 
that we don't checkpoint 
}



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44235617
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
--- End diff --

This reasoning in the comment is not relevant any more. Please update


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44235599
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param clock In order to use ManualClocks for the purpose of testing
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String,
+clock: Clock = new SystemClock) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread: RecurringTimer = 
startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
+   * we will use that to make the final checkpoint. If `null` is provided, 
we will not make the
+   * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+   */
+  def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit =
--- End diff --

nit: Add braces around the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154901716
  
Overall its looks good. Just a few minor comments. 
@zsxwing Does this look good to you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44235801
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -217,6 +226,29 @@ private[kinesis] class KinesisReceiver[T](
   }
 
   /**
+   * Set the checkpointer that will be used to checkpoint sequence numbers 
to DynamoDB for the
+   * given shardId.
+   */
+  private[kinesis] def setCheckpointer(
+  shardId: String,
+  checkpointer: IRecordProcessorCheckpointer): Unit = {
+assert(kinesisCheckpointer != null, "Kinesis Checkpointer not 
initialized!")
+kinesisCheckpointer.setCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Remove the checkpointer for the given shardId. The provided 
checkpointer will be used to
+   * checkpoint one last time for the given shard. If `checkpointer` is 
`null`, then we will not
+   * checkpoint.
+   */
+  private[kinesis] def removeCheckpointer(
--- End diff --

Same issue as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44235797
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -217,6 +226,29 @@ private[kinesis] class KinesisReceiver[T](
   }
 
   /**
+   * Set the checkpointer that will be used to checkpoint sequence numbers 
to DynamoDB for the
+   * given shardId.
+   */
+  private[kinesis] def setCheckpointer(
--- End diff --

The whole class is `private[kinesis]`. Its not a big deal really, but it 
causes a bit of confusion while reading the code, as if this the whole class is 
public. Please be aware of such issues elsewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154931538
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154931548
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154944163
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154943927
  
**[Test build #45344 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45344/consoleFull)**
 for PR 9421 at commit 
[`f29814c`](https://github.com/apache/spark/commit/f29814c2e65ae434a7ef20977d2401845d5a).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154932101
  
**[Test build #45344 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45344/consoleFull)**
 for PR 9421 at commit 
[`f29814c`](https://github.com/apache/spark/commit/f29814c2e65ae434a7ef20977d2401845d5a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154513836
  
Build started sha1 is merged.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154522193
  
**[Test build #45241 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45241/consoleFull)**
 for PR 9421 at commit 
[`36ac43e`](https://github.com/apache/spark/commit/36ac43e5c4f06a4a8e173af7b1e8e1038d214323).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154522342
  

Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45241/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154522341
  
Build finished. 168 tests run, 0 skipped, 0 failed.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154167783
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154167741
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154173376
  
**[Test build #45139 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45139/consoleFull)**
 for PR 9421 at commit 
[`457c6c6`](https://github.com/apache/spark/commit/457c6c689a5b309137f7a91f4466c4bac19e19f0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154168797
  
**[Test build #45137 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45137/consoleFull)**
 for PR 9421 at commit 
[`95e366c`](https://github.com/apache/spark/commit/95e366c5f5284ff39075bb9c530cf0f8f7180c8a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154168895
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154168710
  
**[Test build #45134 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45134/consoleFull)**
 for PR 9421 at commit 
[`e0cabf7`](https://github.com/apache/spark/commit/e0cabf7572fdd293a45ce765befde711ad5dea6e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154172998
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154173038
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154165611
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154165535
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154168885
  
**[Test build #45134 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45134/consoleFull)**
 for PR 9421 at commit 
[`e0cabf7`](https://github.com/apache/spark/commit/e0cabf7572fdd293a45ce765befde711ad5dea6e).
 * This patch **fails RAT tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154168946
  
**[Test build #45137 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45137/consoleFull)**
 for PR 9421 at commit 
[`95e366c`](https://github.com/apache/spark/commit/95e366c5f5284ff39075bb9c530cf0f8f7180c8a).
 * This patch **fails RAT tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154168897
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45134/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154168952
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45137/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154168950
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44088617
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds}
+
+class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar 
with BeforeAndAfter {
+
+  val app = "TestKinesisReceiver"
+  val stream = "mySparkStream"
+  val endpoint = "endpoint-url"
+  val workerId = "dummyWorkerId"
+  val shardId = "dummyShardId"
+  val seqNum = "123"
+  val otherSeqNum = "245"
+  val checkpointInterval = Duration(10)
+  val someSeqNum = Some(seqNum)
+  val someOtherSeqNum = Some(otherSeqNum)
+
+  var receiverMock: KinesisReceiver[Array[Byte]] = _
+  var checkpointerMock: IRecordProcessorCheckpointer = _
+
+  override def beforeFunction(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+  }
+
+  test("checkpoint is not called for None's and nulls") {
--- End diff --

Why even have options in the signature such that you have to test for it???


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154241512
  
**[Test build #45174 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45174/consoleFull)**
 for PR 9421 at commit 
[`88a67bc`](https://github.com/apache/spark/commit/88a67bc1cecee6ebfba0f00774ba64308b266eef).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154240332
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154246737
  
Overall there are a few majors suggestions. 
- To test for the timer stuff in `KinesisCheckpointer`, its best to 
refactor `KinesisCheckpointer` to use `streaming.util.RecurringTimer` instead 
of scheduled executor service, so that it can be tested with manual clock. 
- Some refactoring of the KinesisCheckpointer internal logic to remove 
unnecessary options, and to simplify shutdown logic.
- Some minor test refactoring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44082707
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
--- End diff --

Why do we need to do two different things. Unlike the usual checkpointing 
by usual IRecordProcessor implementations which checkpoint as soon as the data 
is received, we checkpoint stuff that has been received AND stored in Spark 
reliably. If some data has already been stored, then isnt it just strictly 
better to write corresponding offset to DynamoDB in any condition? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44083492
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
+   * we will use that to make the final checkpoint. If `null` is provided, 
we will not make the
+   * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+   */
+  def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpoint(shardId, Option(checkpointer))
+checkpointers.remove(shardId)
+  }
+
+  /** Perform the checkpoint. Exposed for tests. */
+  private[kinesis] def checkpoint(
+  shardId: String,
+  checkpointer: Option[IRecordProcessorCheckpointer]): Unit = {
+// if this method throws an exception, then the scheduled task will 
not run again
+try {
+  checkpointer.foreach { cp =>
+receiver.getLatestSeqNumToCheckpoint(shardId).foreach { 
latestSeqNum =>
+  val lastSeqNum = lastCheckpointedSeqNums.get(shardId)
+  // Kinesis sequence numbers are monotonically increasing 
strings, therefore we can do
+  // safely do the string comparison
+  if (lastSeqNum == null || latestSeqNum > lastSeqNum) {
+/* Perform the checkpoint */
+
KinesisRecordProcessor.retryRandom(cp.checkpoint(latestSeqNum), 4, 100)
+logDebug(s"Checkpoint:  WorkerId $workerId completed 
checkpoint at sequence number" +
+  s" $latestSeqNum for shardId $shardId")
+lastCheckpointedSeqNums.put(shardId, latestSeqNum)
+  }
+}
+  }
+} catch {
+  case NonFatal(e) =>
+logError("Failed to checkpoint to DynamoDB.", e)
--- End diff --

This should be a logWarn. Its not a serious error as future attempts may 
succeed. Just like task failures are logged as warnings as they are going to 
reattempted, and only job failures are logged as errors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure 

[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44086804
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -216,6 +226,25 @@ private[kinesis] class KinesisReceiver[T](
 shardIdToLatestStoredSeqNum.get(shardId)
   }
 
+  /** Pass the setCheckpointer request to the checkpointer. */
+  private[kinesis] def setCheckpointerForShardId(
--- End diff --

Since I have the `getCheckpointer` method, I didn't want them to feel the 
same way. `getCheckpointer` returns the central `checkpointer` but 
`setCheckpointer` is for a specific shardId. Don't you think that would be 
confusing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154240335
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45173/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154240258
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154240324
  
**[Test build #45173 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45173/consoleFull)**
 for PR 9421 at commit 
[`79f47ec`](https://github.com/apache/spark/commit/79f47ec116bd8c17a7ab7266c90c186ec4afaff8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154240240
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154241059
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44090030
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
+   * we will use that to make the final checkpoint. If `null` is provided, 
we will not make the
+   * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+   */
+  def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpoint(shardId, Option(checkpointer))
+checkpointers.remove(shardId)
+  }
+
+  /** Perform the checkpoint. Exposed for tests. */
+  private[kinesis] def checkpoint(
+  shardId: String,
+  checkpointer: Option[IRecordProcessorCheckpointer]): Unit = {
--- End diff --

But for that a cleaner design is to update the checkpointer in the map, and 
then call `checkpoint(shardId)`. Its confusing code to see that 

Also, why have it as a `Option` and later test for cases where it is None?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44083002
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
+   * we will use that to make the final checkpoint. If `null` is provided, 
we will not make the
+   * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+   */
+  def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpoint(shardId, Option(checkpointer))
+checkpointers.remove(shardId)
+  }
+
+  /** Perform the checkpoint. Exposed for tests. */
+  private[kinesis] def checkpoint(
+  shardId: String,
+  checkpointer: Option[IRecordProcessorCheckpointer]): Unit = {
--- End diff --

Why pass the checkpointer if the `lastCheckpointedSeqNums` already has it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44086994
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
+   * we will use that to make the final checkpoint. If `null` is provided, 
we will not make the
+   * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+   */
+  def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpoint(shardId, Option(checkpointer))
+checkpointers.remove(shardId)
+  }
+
+  /** Perform the checkpoint. Exposed for tests. */
+  private[kinesis] def checkpoint(
+  shardId: String,
+  checkpointer: Option[IRecordProcessorCheckpointer]): Unit = {
--- End diff --

You mean `checkpointers`? I want `checkpoint` to be called during the 
shutdown of a record processor called as well, independent from the checkpoint 
time. Otherwise we have to keep track of, Should I invalidate this checkpointer 
for shardId


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44089550
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds}
+
+class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar 
with BeforeAndAfter {
+
+  val app = "TestKinesisReceiver"
+  val stream = "mySparkStream"
+  val endpoint = "endpoint-url"
+  val workerId = "dummyWorkerId"
+  val shardId = "dummyShardId"
+  val seqNum = "123"
+  val otherSeqNum = "245"
+  val checkpointInterval = Duration(10)
+  val someSeqNum = Some(seqNum)
+  val someOtherSeqNum = Some(otherSeqNum)
+
+  var receiverMock: KinesisReceiver[Array[Byte]] = _
+  var checkpointerMock: IRecordProcessorCheckpointer = _
+
+  override def beforeFunction(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+  }
+
+  test("checkpoint is not called for None's and nulls") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+checkpointState.checkpoint(shardId, None)
+checkpointState.checkpoint(shardId, Option(null))
+// the above two calls should be No-Ops
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is not called twice for the same sequence number") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is called after sequence number increases") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(seqNum)
+verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+  }
+
+  test("should checkpoint if we have exceeded the checkpoint interval") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.setCheckpointer(shardId, checkpointerMock)
+Thread.sleep(checkpointInterval.milliseconds * 5)
+verify(checkpointerMock, times(1)).checkpoint(seqNum)
+verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+checkpointState.shutdown()
+  }
+
+  test("shouldn't checkpoint if we have not exceeded the checkpoint 
interval") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, 

[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44090270
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds}
+
+class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar 
with BeforeAndAfter {
+
+  private val app = "TestKinesisReceiver"
+  private val stream = "mySparkStream"
+  private val endpoint = "endpoint-url"
+  private val workerId = "dummyWorkerId"
+  private val shardId = "dummyShardId"
+  private val seqNum = "123"
+  private val otherSeqNum = "245"
+  private val checkpointInterval = Duration(10)
+  private val someSeqNum = Some(seqNum)
+  private val someOtherSeqNum = Some(otherSeqNum)
+
+  private var receiverMock: KinesisReceiver[Array[Byte]] = _
+  private var checkpointerMock: IRecordProcessorCheckpointer = _
+
+  override def beforeFunction(): Unit = {
--- End diff --

Nit: dont we generally use `before {  }` 
```
[tdas @ Zion spark2] git grep "beforeFunction" | wc -l
   5
[tdas @ Zion spark2] git grep "before {" | wc -l
  44
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154245330
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44083518
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
+   * we will use that to make the final checkpoint. If `null` is provided, 
we will not make the
+   * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+   */
+  def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpoint(shardId, Option(checkpointer))
+checkpointers.remove(shardId)
+  }
+
+  /** Perform the checkpoint. Exposed for tests. */
+  private[kinesis] def checkpoint(
+  shardId: String,
+  checkpointer: Option[IRecordProcessorCheckpointer]): Unit = {
+// if this method throws an exception, then the scheduled task will 
not run again
+try {
+  checkpointer.foreach { cp =>
+receiver.getLatestSeqNumToCheckpoint(shardId).foreach { 
latestSeqNum =>
+  val lastSeqNum = lastCheckpointedSeqNums.get(shardId)
+  // Kinesis sequence numbers are monotonically increasing 
strings, therefore we can do
+  // safely do the string comparison
+  if (lastSeqNum == null || latestSeqNum > lastSeqNum) {
+/* Perform the checkpoint */
+
KinesisRecordProcessor.retryRandom(cp.checkpoint(latestSeqNum), 4, 100)
+logDebug(s"Checkpoint:  WorkerId $workerId completed 
checkpoint at sequence number" +
+  s" $latestSeqNum for shardId $shardId")
+lastCheckpointedSeqNums.put(shardId, latestSeqNum)
+  }
+}
+  }
+} catch {
+  case NonFatal(e) =>
+logError("Failed to checkpoint to DynamoDB.", e)
+}
+  }
+
+  /**
+   * Start the checkpointer thread with the given checkpoint duration. 
Exposed for tests.
+   */
+  private[kinesis] def startCheckpointerThread(): ScheduledFuture[_] = {
+val period = checkpointInterval.milliseconds
+val ex =
+  ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"Kinesis 
Checkpointer - Worker $workerId")
+val task = new Runnable {
+  def run() = {
+val shardIds = 

[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44083524
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ */
+private[kinesis] class KinesisCheckpointer(
+receiver: KinesisReceiver[_],
+checkpointInterval: Duration,
+workerId: String) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, 
String]()
+
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the 
given shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
--- End diff --

I'm worried that the IRecordProcessorCheckpointers may correspond to the 
wrong instances during, after resharding. Therefore I would like to use the 
latest possible checkpointer as much as possible


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44088418
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds}
+
+class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar 
with BeforeAndAfter {
+
+  val app = "TestKinesisReceiver"
+  val stream = "mySparkStream"
+  val endpoint = "endpoint-url"
+  val workerId = "dummyWorkerId"
+  val shardId = "dummyShardId"
+  val seqNum = "123"
+  val otherSeqNum = "245"
+  val checkpointInterval = Duration(10)
+  val someSeqNum = Some(seqNum)
+  val someOtherSeqNum = Some(otherSeqNum)
+
+  var receiverMock: KinesisReceiver[Array[Byte]] = _
+  var checkpointerMock: IRecordProcessorCheckpointer = _
+
+  override def beforeFunction(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+  }
+
+  test("checkpoint is not called for None's and nulls") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
--- End diff --

Please rename variables correctly, its not checkpointState any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154241070
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44089287
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds}
+
+class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar 
with BeforeAndAfter {
+
+  val app = "TestKinesisReceiver"
+  val stream = "mySparkStream"
+  val endpoint = "endpoint-url"
+  val workerId = "dummyWorkerId"
+  val shardId = "dummyShardId"
+  val seqNum = "123"
+  val otherSeqNum = "245"
+  val checkpointInterval = Duration(10)
+  val someSeqNum = Some(seqNum)
+  val someOtherSeqNum = Some(otherSeqNum)
+
+  var receiverMock: KinesisReceiver[Array[Byte]] = _
+  var checkpointerMock: IRecordProcessorCheckpointer = _
+
+  override def beforeFunction(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+  }
+
+  test("checkpoint is not called for None's and nulls") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+checkpointState.checkpoint(shardId, None)
+checkpointState.checkpoint(shardId, Option(null))
+// the above two calls should be No-Ops
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is not called twice for the same sequence number") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is called after sequence number increases") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(seqNum)
+verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+  }
+
+  test("should checkpoint if we have exceeded the checkpoint interval") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.setCheckpointer(shardId, checkpointerMock)
+Thread.sleep(checkpointInterval.milliseconds * 5)
--- End diff --

Do not use thread.sleep. causes a lot of flakiness, and we have been 
actively removing sleep from everywhere. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket

[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44089562
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds}
+
+class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar 
with BeforeAndAfter {
+
+  val app = "TestKinesisReceiver"
+  val stream = "mySparkStream"
+  val endpoint = "endpoint-url"
+  val workerId = "dummyWorkerId"
+  val shardId = "dummyShardId"
+  val seqNum = "123"
+  val otherSeqNum = "245"
+  val checkpointInterval = Duration(10)
+  val someSeqNum = Some(seqNum)
+  val someOtherSeqNum = Some(otherSeqNum)
+
+  var receiverMock: KinesisReceiver[Array[Byte]] = _
+  var checkpointerMock: IRecordProcessorCheckpointer = _
+
+  override def beforeFunction(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+  }
+
+  test("checkpoint is not called for None's and nulls") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+checkpointState.checkpoint(shardId, None)
+checkpointState.checkpoint(shardId, Option(null))
+// the above two calls should be No-Ops
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is not called twice for the same sequence number") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is called after sequence number increases") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(seqNum)
+verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+  }
+
+  test("should checkpoint if we have exceeded the checkpoint interval") {
+when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+  .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.setCheckpointer(shardId, checkpointerMock)
+Thread.sleep(checkpointInterval.milliseconds * 5)
+verify(checkpointerMock, times(1)).checkpoint(seqNum)
+verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+checkpointState.shutdown()
+  }
+
+  test("shouldn't checkpoint if we have not exceeded the checkpoint 
interval") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, 

[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44090207
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds}
+
+class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar 
with BeforeAndAfter {
+
+  private val app = "TestKinesisReceiver"
+  private val stream = "mySparkStream"
+  private val endpoint = "endpoint-url"
+  private val workerId = "dummyWorkerId"
+  private val shardId = "dummyShardId"
+  private val seqNum = "123"
+  private val otherSeqNum = "245"
+  private val checkpointInterval = Duration(10)
+  private val someSeqNum = Some(seqNum)
+  private val someOtherSeqNum = Some(otherSeqNum)
+
+  private var receiverMock: KinesisReceiver[Array[Byte]] = _
+  private var checkpointerMock: IRecordProcessorCheckpointer = _
+
+  override def beforeFunction(): Unit = {
+receiverMock = mock[KinesisReceiver[Array[Byte]]]
+checkpointerMock = mock[IRecordProcessorCheckpointer]
+  }
+
+  test("checkpoint is not called for None's and nulls") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
+  new KinesisCheckpointer(receiverMock, checkpointInterval, workerId)
+checkpointState.checkpoint(shardId, Option(checkpointerMock))
+
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+checkpointState.checkpoint(shardId, None)
+checkpointState.checkpoint(shardId, Option(null))
+// the above two calls should be No-Ops
+verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is not called twice for the same sequence number") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+val checkpointState =
--- End diff --

Cant some of this duplicate code move into the before?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154245314
  
**[Test build #45174 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45174/consoleFull)**
 for PR 9421 at commit 
[`88a67bc`](https://github.com/apache/spark/commit/88a67bc1cecee6ebfba0f00774ba64308b266eef).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154245332
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45174/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44067306
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -216,6 +226,25 @@ private[kinesis] class KinesisReceiver[T](
 shardIdToLatestStoredSeqNum.get(shardId)
   }
 
+  /** Pass the setCheckpointer request to the checkpointer. */
+  private[kinesis] def setCheckpointerForShardId(
--- End diff --

setCheckpointer is sufficient. Scala docs should just say "set the 
checkpoint that will be used to checkpoint sequence numbers to DynamoDB". 
Whether there is a "setCheckpointer method or not on some other checkpointer - 
these are implementation details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154188753
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154188755
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45139/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44069549
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -135,18 +105,17 @@ private[kinesis] class KinesisRecordProcessor[T](
*/
   override def shutdown(checkpointer: IRecordProcessorCheckpointer, 
reason: ShutdownReason) {
 logInfo(s"Shutdown:  Shutting down workerId $workerId with reason 
$reason")
-reason match {
+// We want to return a checkpointer based on the shutdown cause. If we 
want to terminate,
--- End diff --

We do remove. But we checkpoint one last time using the most recent 
checkpointer before removing it from the map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44065511
  
--- Diff: 
extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase, Milliseconds}
+
+class KinesisCheckpointerSuite extends TestSuiteBase with MockitoSugar 
with BeforeAndAfter {
+
+  val app = "TestKinesisReceiver"
--- End diff --

Can you make all of them private? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44065382
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -135,18 +105,17 @@ private[kinesis] class KinesisRecordProcessor[T](
*/
   override def shutdown(checkpointer: IRecordProcessorCheckpointer, 
reason: ShutdownReason) {
 logInfo(s"Shutdown:  Shutting down workerId $workerId with reason 
$reason")
-reason match {
+// We want to return a checkpointer based on the shutdown cause. If we 
want to terminate,
--- End diff --

I dont get this. Dont we always want remove the checkpointer when 
shutdown() is called irrespective of the reason of shutdown?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44077155
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -27,26 +27,23 @@ import 
com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
 import com.amazonaws.services.kinesis.model.Record
 
 import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
 
 /**
  * Kinesis-specific implementation of the Kinesis Client Library (KCL) 
IRecordProcessor.
  * This implementation operates on the Array[Byte] from the 
KinesisReceiver.
  * The Kinesis Worker creates an instance of this KinesisRecordProcessor 
for each
- *   shard in the Kinesis stream upon startup.  This is normally done in 
separate threads,
- *   but the KCLs within the KinesisReceivers will balance themselves out 
if you create
- *   multiple Receivers.
+ * shard in the Kinesis stream upon startup.  This is normally done in 
separate threads,
+ * but the KCLs within the KinesisReceivers will balance themselves out if 
you create
+ * multiple Receivers.
  *
  * @param receiver Kinesis receiver
  * @param workerId for logging purposes
- * @param checkpointState represents the checkpoint state including the 
next checkpoint time.
- *   It's injected here for mocking purposes.
  */
-private[kinesis] class KinesisRecordProcessor[T](
-receiver: KinesisReceiver[T],
-workerId: String,
-checkpointState: KinesisCheckpointState) extends IRecordProcessor with 
Logging {
+private[kinesis] class KinesisRecordProcessor[T](receiver: 
KinesisReceiver[T], workerId: String)
+  extends IRecordProcessor with Logging {
 
-  // shardId to be populated during initialize()
+  // shardId and checkpointState to be populated during initialize()
--- End diff --

this is irrelevant now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-154188680
  
**[Test build #45139 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45139/consoleFull)**
 for PR 9421 at commit 
[`457c6c6`](https://github.com/apache/spark/commit/457c6c689a5b309137f7a91f4466c4bac19e19f0).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r44076396
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -216,6 +226,25 @@ private[kinesis] class KinesisReceiver[T](
 shardIdToLatestStoredSeqNum.get(shardId)
   }
 
+  /** Pass the setCheckpointer request to the checkpointer. */
+  private[kinesis] def setCheckpointerForShardId(
+  shardId: String,
+  checkpointer: IRecordProcessorCheckpointer): Unit = {
+assert(getCheckpointer() != null, "Kinesis Checkpointer not 
initialized!")
+getCheckpointer().setCheckpointer(shardId, checkpointer)
+  }
+
+  /** Pass the removeCheckpointer request to the checkpointer. */
--- End diff --

Same comment as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-04 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43967600
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -16,39 +16,77 @@
  */
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
- * This is a helper class for managing checkpoint clocks.
+ * This is a helper class for managing Kinesis checkpointing.
  *
- * @param checkpointInterval
- * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param shardId The shard this worker was consuming data from
  */
-private[kinesis] class KinesisCheckpointState(
+private[kinesis] class KinesisCheckpointState[T](
+receiver: KinesisReceiver[T],
 checkpointInterval: Duration,
-currentClock: Clock = new SystemClock())
-  extends Logging {
+workerId: String,
+shardId: String) extends Logging {
 
-  /* Initialize the checkpoint clock using the given currentClock + 
checkpointInterval millis */
-  val checkpointClock = new ManualClock()
-  checkpointClock.setTime(currentClock.getTimeMillis() + 
checkpointInterval.milliseconds)
+  private var _checkpointer: Option[IRecordProcessorCheckpointer] = None
 
-  /**
-   * Check if it's time to checkpoint based on the current time and the 
derived time
-   *   for the next checkpoint
-   *
-   * @return true if it's time to checkpoint
-   */
-  def shouldCheckpoint(): Boolean = {
-new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one. */
+  def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = {
+_checkpointer = Option(checkpointer)
+  }
+
+  /** Perform the checkpoint */
+  private def checkpoint(checkpointer: 
Option[IRecordProcessorCheckpointer]): Unit = {
+// if this method throws an exception, then the scheduled task will 
not run again
+try {
+  checkpointer.foreach { cp =>
--- End diff --

It should be okay


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43857597
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -16,39 +16,77 @@
  */
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
- * This is a helper class for managing checkpoint clocks.
+ * This is a helper class for managing Kinesis checkpointing.
  *
- * @param checkpointInterval
- * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param shardId The shard this worker was consuming data from
  */
-private[kinesis] class KinesisCheckpointState(
+private[kinesis] class KinesisCheckpointState[T](
+receiver: KinesisReceiver[T],
--- End diff --

You need T here. You can take reference to the receiver as 
KinesisReceiver[_]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43857672
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -16,39 +16,77 @@
  */
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
- * This is a helper class for managing checkpoint clocks.
+ * This is a helper class for managing Kinesis checkpointing.
  *
- * @param checkpointInterval
- * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param shardId The shard this worker was consuming data from
  */
-private[kinesis] class KinesisCheckpointState(
+private[kinesis] class KinesisCheckpointState[T](
+receiver: KinesisReceiver[T],
--- End diff --

Also could rename this to KinesisCheckpointer, to better reflect what this 
is doing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43857784
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -57,6 +58,7 @@ private[kinesis] class KinesisRecordProcessor[T](
*/
   override def initialize(shardId: String) {
 this.shardId = shardId
+checkpointState = new KinesisCheckpointState[T](receiver, 
checkpointInterval, workerId, shardId)
--- End diff --

Dont need to pass on T


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43857174
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -16,39 +16,77 @@
  */
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
- * This is a helper class for managing checkpoint clocks.
+ * This is a helper class for managing Kinesis checkpointing.
  *
- * @param checkpointInterval
- * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param shardId The shard this worker was consuming data from
  */
-private[kinesis] class KinesisCheckpointState(
+private[kinesis] class KinesisCheckpointState[T](
+receiver: KinesisReceiver[T],
 checkpointInterval: Duration,
-currentClock: Clock = new SystemClock())
-  extends Logging {
+workerId: String,
+shardId: String) extends Logging {
 
-  /* Initialize the checkpoint clock using the given currentClock + 
checkpointInterval millis */
-  val checkpointClock = new ManualClock()
-  checkpointClock.setTime(currentClock.getTimeMillis() + 
checkpointInterval.milliseconds)
+  private var _checkpointer: Option[IRecordProcessorCheckpointer] = None
 
-  /**
-   * Check if it's time to checkpoint based on the current time and the 
derived time
-   *   for the next checkpoint
-   *
-   * @return true if it's time to checkpoint
-   */
-  def shouldCheckpoint(): Boolean = {
-new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one. */
+  def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = {
+_checkpointer = Option(checkpointer)
+  }
+
+  /** Perform the checkpoint */
+  private def checkpoint(checkpointer: 
Option[IRecordProcessorCheckpointer]): Unit = {
+// if this method throws an exception, then the scheduled task will 
not run again
+try {
+  checkpointer.foreach { cp =>
+receiver.getLatestSeqNumToCheckpoint(shardId).foreach { 
latestSeqNum =>
+  /* Perform the checkpoint */
+  KinesisRecordProcessor.retryRandom(cp.checkpoint(latestSeqNum), 
4, 100)
+
--- End diff --

nit: extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43859706
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -16,39 +16,77 @@
  */
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
- * This is a helper class for managing checkpoint clocks.
+ * This is a helper class for managing Kinesis checkpointing.
  *
- * @param checkpointInterval
- * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param shardId The shard this worker was consuming data from
  */
-private[kinesis] class KinesisCheckpointState(
+private[kinesis] class KinesisCheckpointState[T](
+receiver: KinesisReceiver[T],
 checkpointInterval: Duration,
-currentClock: Clock = new SystemClock())
-  extends Logging {
+workerId: String,
+shardId: String) extends Logging {
 
-  /* Initialize the checkpoint clock using the given currentClock + 
checkpointInterval millis */
-  val checkpointClock = new ManualClock()
-  checkpointClock.setTime(currentClock.getTimeMillis() + 
checkpointInterval.milliseconds)
+  private var _checkpointer: Option[IRecordProcessorCheckpointer] = None
 
-  /**
-   * Check if it's time to checkpoint based on the current time and the 
derived time
-   *   for the next checkpoint
-   *
-   * @return true if it's time to checkpoint
-   */
-  def shouldCheckpoint(): Boolean = {
-new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one. */
+  def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = {
+_checkpointer = Option(checkpointer)
+  }
+
+  /** Perform the checkpoint */
+  private def checkpoint(checkpointer: 
Option[IRecordProcessorCheckpointer]): Unit = {
--- End diff --

Why pass the checkpoint as an option when the class already has reference 
to it in `_checkpointer`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-153673079
  
I there is a general problem that bugs me. I am not sure whether the KCL 
guarantees that the IRecordProcessor.shutdown() will be called when the worker 
is shutdown. From the source code of the KCL it seems that the shutdown will 
only be called when the corresponding processor loses the shard lease or the 
shard is terminated due to resharding. So we cannot guarantee the shutdown of 
the KinesisCheckpointState will be called when the receiver stops. 

For that to happen, the design needs to change. There should a central 
single `KinesisCheckpointer` in the `KinesisReceiver` that is responsible for 
checkpointing all the shards.All the `KinesisRecordProcessor`s register and 
deregister their `IRecordProcessorCheckpointer` to the `KinesisCheckpointer` 
for each `shardId`. For each shardId, the `KinesisCheckpointer` will 
periodically try to call `KinesisRecordProcessor.checkpoint()` with the shardId 
in `shardIdTolatestStoreSeqNumber`. As an optimization, it will keep track of 
the last saved sequence number for each shardId and call checkpoint only if it 
has changed. Finally, when the receiver is stopped, the 
`KinesisCheckpointer.shutdown` will do a final round of checkpointing.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-03 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43817473
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -16,39 +16,77 @@
  */
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
- * This is a helper class for managing checkpoint clocks.
+ * This is a helper class for managing Kinesis checkpointing.
  *
- * @param checkpointInterval
- * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param shardId The shard this worker was consuming data from
  */
-private[kinesis] class KinesisCheckpointState(
+private[kinesis] class KinesisCheckpointState[T](
+receiver: KinesisReceiver[T],
 checkpointInterval: Duration,
-currentClock: Clock = new SystemClock())
-  extends Logging {
+workerId: String,
+shardId: String) extends Logging {
 
-  /* Initialize the checkpoint clock using the given currentClock + 
checkpointInterval millis */
-  val checkpointClock = new ManualClock()
-  checkpointClock.setTime(currentClock.getTimeMillis() + 
checkpointInterval.milliseconds)
+  private var _checkpointer: Option[IRecordProcessorCheckpointer] = None
 
-  /**
-   * Check if it's time to checkpoint based on the current time and the 
derived time
-   *   for the next checkpoint
-   *
-   * @return true if it's time to checkpoint
-   */
-  def shouldCheckpoint(): Boolean = {
-new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one. */
+  def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = {
+_checkpointer = Option(checkpointer)
+  }
+
+  /** Perform the checkpoint */
+  private def checkpoint(checkpointer: 
Option[IRecordProcessorCheckpointer]): Unit = {
+// if this method throws an exception, then the scheduled task will 
not run again
+try {
+  checkpointer.foreach { cp =>
--- End diff --

What will happen if we use an old `IRecordProcessorCheckpointer` to 
checkpoint?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-03 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43815388
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -16,39 +16,77 @@
  */
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
- * This is a helper class for managing checkpoint clocks.
+ * This is a helper class for managing Kinesis checkpointing.
  *
- * @param checkpointInterval
- * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param shardId The shard this worker was consuming data from
  */
-private[kinesis] class KinesisCheckpointState(
+private[kinesis] class KinesisCheckpointState[T](
+receiver: KinesisReceiver[T],
 checkpointInterval: Duration,
-currentClock: Clock = new SystemClock())
-  extends Logging {
+workerId: String,
+shardId: String) extends Logging {
 
-  /* Initialize the checkpoint clock using the given currentClock + 
checkpointInterval millis */
-  val checkpointClock = new ManualClock()
-  checkpointClock.setTime(currentClock.getTimeMillis() + 
checkpointInterval.milliseconds)
+  private var _checkpointer: Option[IRecordProcessorCheckpointer] = None
 
-  /**
-   * Check if it's time to checkpoint based on the current time and the 
derived time
-   *   for the next checkpoint
-   *
-   * @return true if it's time to checkpoint
-   */
-  def shouldCheckpoint(): Boolean = {
-new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
+  private val checkpointerThread = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one. */
+  def setCheckpointer(checkpointer: IRecordProcessorCheckpointer): Unit = {
+_checkpointer = Option(checkpointer)
+  }
+
+  /** Perform the checkpoint */
+  private def checkpoint(checkpointer: 
Option[IRecordProcessorCheckpointer]): Unit = {
+// if this method throws an exception, then the scheduled task will 
not run again
+try {
+  checkpointer.foreach { cp =>
+receiver.getLatestSeqNumToCheckpoint(shardId).foreach { 
latestSeqNum =>
+  /* Perform the checkpoint */
+  KinesisRecordProcessor.retryRandom(cp.checkpoint(latestSeqNum), 
4, 100)
+
+  logDebug(s"Checkpoint:  WorkerId $workerId completed checkpoint 
at sequence number" +
+s" $latestSeqNum for shardId $shardId")
+}
+  }
+} catch {
+  case NonFatal(e) =>
+logError("Failed to checkpoint to DynamoDB.", e)
+}
+  }
+
+  /** Start the checkpointer thread with the given checkpoint duration. */
+  private def startCheckpointerThread(): ScheduledFuture[_] = {
+val period = checkpointInterval.milliseconds
+val ex = new ScheduledThreadPoolExecutor(1)
--- End diff --

This needs to be shutdown. Otherwise, it will leak a thread. BTW, it's 
better to add name and use daemon threads. You can use 
`ThreadUtils.newDaemonSingleThreadScheduledExecutor` to create a named one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-03 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/9421#discussion_r43817494
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -16,39 +16,77 @@
  */
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Duration
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
- * This is a helper class for managing checkpoint clocks.
+ * This is a helper class for managing Kinesis checkpointing.
  *
- * @param checkpointInterval
- * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ * @param receiver The receiver that keeps track of which sequence numbers 
we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param shardId The shard this worker was consuming data from
  */
-private[kinesis] class KinesisCheckpointState(
+private[kinesis] class KinesisCheckpointState[T](
+receiver: KinesisReceiver[T],
 checkpointInterval: Duration,
-currentClock: Clock = new SystemClock())
-  extends Logging {
+workerId: String,
+shardId: String) extends Logging {
 
-  /* Initialize the checkpoint clock using the given currentClock + 
checkpointInterval millis */
-  val checkpointClock = new ManualClock()
-  checkpointClock.setTime(currentClock.getTimeMillis() + 
checkpointInterval.milliseconds)
+  private var _checkpointer: Option[IRecordProcessorCheckpointer] = None
--- End diff --

This should be `volatile`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-153171076
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-153171035
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-153171831
  
**[Test build #44839 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44839/consoleFull)**
 for PR 9421 at commit 
[`feb69ba`](https://github.com/apache/spark/commit/feb69bacd568c8a8a8112d11981c626482d4af1a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-02 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/9421

[SPARK-11359][STREAMING][KINESIS] Checkpoint to DynamoDB even when new data 
doesn't come in

Currently, the checkpoints to DynamoDB occur only when new data comes in, 
as we update the clock for the checkpointState. This PR makes the checkpoint a 
scheduled execution based on the `checkpointInterval`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark kinesis-checkpoint

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9421


commit feb69bacd568c8a8a8112d11981c626482d4af1a
Author: Burak Yavuz 
Date:   2015-11-02T21:56:39Z

update Kinesis checkpointing




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-153181727
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-153181593
  
**[Test build #44839 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44839/consoleFull)**
 for PR 9421 at commit 
[`feb69ba`](https://github.com/apache/spark/commit/feb69bacd568c8a8a8112d11981c626482d4af1a).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11359][STREAMING][KINESIS] Checkpoint t...

2015-11-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9421#issuecomment-153181731
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44839/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org