svn commit: r25806 - in /dev/spark/2.3.1-SNAPSHOT-2018_03_17_18_01-6937571-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-03-17 Thread pwendell
Author: pwendell
Date: Sun Mar 18 01:15:47 2018
New Revision: 25806

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_03_17_18_01-6937571 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23623][SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer (branch-2.3)

2018-03-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 21b6de459 -> 6937571ab


[SPARK-23623][SS] Avoid concurrent use of cached consumers in 
CachedKafkaConsumer (branch-2.3)

This is a backport of #20767 to branch 2.3

## What changes were proposed in this pull request?
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one task using trying to read the same Kafka 
TopicPartition at the same time. Hence, the cache was keyed by the 
TopicPartition a consumer is supposed to read. And any cases where this 
assumption may not be true, we have SparkPlan flag to disable the use of a 
cache. So it was up to the planner to correctly identify when it was not safe 
to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a 
high-level planner to reason about the low-level execution model, whether there 
will be multiple tasks in the same query trying to read the same partition. 
Case in point, 2.3.0 introduced stream-stream joins, and you can build a 
streaming self-join query on Kafka. It's pretty non-trivial to figure out how 
this leads to two tasks reading the same partition twice, possibly 
concurrently. And due to the non-triviality, it is hard to figure this out in 
the planner and set the flag to avoid the cache / consumer pool. And this can 
inadvertently lead to ConcurrentModificationException ,or worse, silent reading 
of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand 
these low-level optimizations. Rather the consumer pool should be smart enough 
avoid concurrent use of a cached consumer. Currently, it tries to do so but 
incorrectly (the flag inuse is not checked when returning a cached consumer, 
see 
[this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)).
 If there is another request for the same partition as a currently in-use 
consumer, the pool should automatically return a fresh consumer that should be 
closed when the task is done. Then the planner does not have to have a flag to 
avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the 
users of the consumer so that the client code does not have to reason about 
whether to stop and release. They simply called `val consumer = 
KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is 
generated.
- If there is a concurrent attempt of the same task, then a new consumer is 
generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer 
given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe 
enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for 
the same partition from the consumer pool.

Author: Tathagata Das 

Closes #20848 from tdas/SPARK-23623-2.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6937571a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6937571a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6937571a

Branch: refs/heads/branch-2.3
Commit: 6937571ab8818a62ec2457a373eb3f6f618985e1
Parents: 21b6de4
Author: Tathagata Das 
Authored: Sat Mar 17 16:24:51 2018 -0700
Committer: Shixiong Zhu 
Committed: Sat Mar 17 16:24:51 2018 -0700

--
 .../sql/kafka010/CachedKafkaConsumer.scala  | 438 
 .../sql/kafka010/KafkaContinuousReader.scala|   4 +-
 .../spark/sql/kafka010/KafkaDataConsumer.scala  | 516 +++
 .../spark/sql/kafka010/KafkaSourceRDD.scala |  23 +-
 .../sql/kafka010/CachedKafkaConsumerSuite.scala |  34 --
 .../sql/kafka010/KafkaDataConsumerSuite.scala   | 124 +
 6 files changed, 648 insertions(+), 491 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6937571a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
--
diff --git