Repository: spark Updated Branches: refs/heads/master e836c27ce -> 3fd0ccb13
[SPARK-23484][SS] Fix possible race condition in KafkaContinuousReader ## What changes were proposed in this pull request? var `KafkaContinuousReader.knownPartitions` should be threadsafe as it is accessed from multiple threads - the query thread at the time of reader factory creation, and the epoch tracking thread at the time of `needsReconfiguration`. ## How was this patch tested? Existing tests. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #20655 from tdas/SPARK-23484. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fd0ccb1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fd0ccb1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fd0ccb1 Branch: refs/heads/master Commit: 3fd0ccb13fea44727d970479af1682ef00592147 Parents: e836c27 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Wed Feb 21 14:56:13 2018 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Feb 21 14:56:13 2018 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/kafka010/KafkaContinuousReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3fd0ccb1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 97a0f66..ecd1170 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -66,7 +66,7 @@ class KafkaContinuousReader( // Initialized when creating reader factories. If this diverges from the partitions at the latest // offsets, we need to reconfigure. // Exposed outside this object only for unit tests. - private[sql] var knownPartitions: Set[TopicPartition] = _ + @volatile private[sql] var knownPartitions: Set[TopicPartition] = _ override def readSchema: StructType = KafkaOffsetReader.kafkaSchema --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org