Repository: spark Updated Branches: refs/heads/branch-2.0 8d55886aa -> 4f3f09696
[SPARK-18400][STREAMING] NPE when resharding Kinesis Stream ## What changes were proposed in this pull request? Avoid NPE in KinesisRecordProcessor when shutdown happens without successful init ## How was this patch tested? Existing tests Author: Sean Owen <so...@cloudera.com> Closes #15882 from srowen/SPARK-18400. (cherry picked from commit 43a26899e5dd2364297eaf8985bd68367e4735a7) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f3f0969 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f3f0969 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f3f0969 Branch: refs/heads/branch-2.0 Commit: 4f3f09696ea12b631e1db8d00baf363292c5f3e3 Parents: 8d55886 Author: Sean Owen <so...@cloudera.com> Authored: Wed Nov 16 10:16:36 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Nov 16 10:16:58 2016 +0000 ---------------------------------------------------------------------- .../kinesis/KinesisRecordProcessor.scala | 42 +++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4f3f0969/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index 80e0cce..a0ccd08 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -27,7 +27,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record import org.apache.spark.internal.Logging -import org.apache.spark.streaming.Duration /** * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. @@ -102,27 +101,32 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE * @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE) */ - override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) { + override def shutdown( + checkpointer: IRecordProcessorCheckpointer, + reason: ShutdownReason): Unit = { logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") - reason match { - /* - * TERMINATE Use Case. Checkpoint. - * Checkpoint to indicate that all records from the shard have been drained and processed. - * It's now OK to read from the new shards that resulted from a resharding event. - */ - case ShutdownReason.TERMINATE => - receiver.removeCheckpointer(shardId, checkpointer) + // null if not initialized before shutdown: + if (shardId == null) { + logWarning(s"No shardId for workerId $workerId?") + } else { + reason match { + /* + * TERMINATE Use Case. Checkpoint. + * Checkpoint to indicate that all records from the shard have been drained and processed. + * It's now OK to read from the new shards that resulted from a resharding event. + */ + case ShutdownReason.TERMINATE => receiver.removeCheckpointer(shardId, checkpointer) - /* - * ZOMBIE Use Case or Unknown reason. NoOp. - * No checkpoint because other workers may have taken over and already started processing - * the same records. - * This may lead to records being processed more than once. - */ - case _ => - receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint + /* + * ZOMBIE Use Case or Unknown reason. NoOp. + * No checkpoint because other workers may have taken over and already started processing + * the same records. + * This may lead to records being processed more than once. + * Return null so that we don't checkpoint + */ + case _ => receiver.removeCheckpointer(shardId, null) + } } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org