Repository: spark
Updated Branches:
  refs/heads/branch-2.0 8d55886aa -> 4f3f09696


[SPARK-18400][STREAMING] NPE when resharding Kinesis Stream

## What changes were proposed in this pull request?

Avoid NPE in KinesisRecordProcessor when shutdown happens without successful 
init

## How was this patch tested?

Existing tests

Author: Sean Owen <so...@cloudera.com>

Closes #15882 from srowen/SPARK-18400.

(cherry picked from commit 43a26899e5dd2364297eaf8985bd68367e4735a7)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f3f0969
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f3f0969
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f3f0969

Branch: refs/heads/branch-2.0
Commit: 4f3f09696ea12b631e1db8d00baf363292c5f3e3
Parents: 8d55886
Author: Sean Owen <so...@cloudera.com>
Authored: Wed Nov 16 10:16:36 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Nov 16 10:16:58 2016 +0000

----------------------------------------------------------------------
 .../kinesis/KinesisRecordProcessor.scala        | 42 +++++++++++---------
 1 file changed, 23 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f3f0969/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 80e0cce..a0ccd08 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -27,7 +27,6 @@ import 
com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
 import com.amazonaws.services.kinesis.model.Record
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.Duration
 
 /**
  * Kinesis-specific implementation of the Kinesis Client Library (KCL) 
IRecordProcessor.
@@ -102,27 +101,32 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
    * @param checkpointer used to perform a Kinesis checkpoint for 
ShutdownReason.TERMINATE
    * @param reason for shutdown (ShutdownReason.TERMINATE or 
ShutdownReason.ZOMBIE)
    */
-  override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: 
ShutdownReason) {
+  override def shutdown(
+      checkpointer: IRecordProcessorCheckpointer,
+      reason: ShutdownReason): Unit = {
     logInfo(s"Shutdown:  Shutting down workerId $workerId with reason $reason")
-    reason match {
-      /*
-       * TERMINATE Use Case.  Checkpoint.
-       * Checkpoint to indicate that all records from the shard have been 
drained and processed.
-       * It's now OK to read from the new shards that resulted from a 
resharding event.
-       */
-      case ShutdownReason.TERMINATE =>
-        receiver.removeCheckpointer(shardId, checkpointer)
+    // null if not initialized before shutdown:
+    if (shardId == null) {
+      logWarning(s"No shardId for workerId $workerId?")
+    } else {
+      reason match {
+        /*
+         * TERMINATE Use Case.  Checkpoint.
+         * Checkpoint to indicate that all records from the shard have been 
drained and processed.
+         * It's now OK to read from the new shards that resulted from a 
resharding event.
+         */
+        case ShutdownReason.TERMINATE => receiver.removeCheckpointer(shardId, 
checkpointer)
 
-      /*
-       * ZOMBIE Use Case or Unknown reason.  NoOp.
-       * No checkpoint because other workers may have taken over and already 
started processing
-       *    the same records.
-       * This may lead to records being processed more than once.
-       */
-      case _ =>
-        receiver.removeCheckpointer(shardId, null) // return null so that we 
don't checkpoint
+        /*
+         * ZOMBIE Use Case or Unknown reason.  NoOp.
+         * No checkpoint because other workers may have taken over and already 
started processing
+         *    the same records.
+         * This may lead to records being processed more than once.
+         * Return null so that we don't checkpoint
+         */
+        case _ => receiver.removeCheckpointer(shardId, null)
+      }
     }
-
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to