This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b205269  [SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to 
make sure new consumer used
b205269 is described below

commit b205269ae09dc384de98ab027da4c17abf3a9dd9
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
AuthorDate: Mon Aug 26 13:12:14 2019 -0700

    [SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to make sure new 
consumer used
    
    ### What changes were proposed in this pull request?
    When Task retry happens with Kafka source then it's not known whether the 
consumer is the issue so the old consumer removed from cache and new consumer 
created. The feature works fine but not covered with tests.
    
    In this PR I've added such test for DStreams + Structured Streaming.
    
    ### Why are the changes needed?
    No such tests are there.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing + new unit tests.
    
    Closes #25582 from gaborgsomogyi/SPARK-28875.
    
    Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../spark/sql/kafka010/KafkaDataConsumer.scala     |  6 +--
 .../sql/kafka010/KafkaDataConsumerSuite.scala      | 62 +++++++++++++++-------
 .../kafka010/KafkaDataConsumerSuite.scala          | 24 +++++++++
 3 files changed, 71 insertions(+), 21 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index cbb99fd..af240dc 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -78,7 +78,7 @@ private[kafka010] sealed trait KafkaDataConsumer {
   def release(): Unit
 
   /** Reference to the internal implementation that this wrapper delegates to 
*/
-  protected def internalConsumer: InternalKafkaConsumer
+  def internalConsumer: InternalKafkaConsumer
 }
 
 
@@ -512,7 +512,7 @@ private[kafka010] object KafkaDataConsumer extends Logging {
     override def release(): Unit = { internalConsumer.close() }
   }
 
-  private case class CacheKey(groupId: String, topicPartition: TopicPartition) 
{
+  private[kafka010] case class CacheKey(groupId: String, topicPartition: 
TopicPartition) {
     def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, 
Object]) =
       
this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], 
topicPartition)
   }
@@ -521,7 +521,7 @@ private[kafka010] object KafkaDataConsumer extends Logging {
   // - We make a best-effort attempt to maintain the max size of the cache as 
configured capacity.
   //   The capacity is not guaranteed to be maintained, especially when there 
are more active
   //   tasks simultaneously using consumers than the capacity.
-  private lazy val cache = {
+  private[kafka010] lazy val cache = {
     val conf = SparkEnv.get.conf
     val capacity = conf.get(CONSUMER_CACHE_CAPACITY)
     new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, 
true) {
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
index 2aa869c..8aa7e06 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
@@ -20,22 +20,23 @@ package org.apache.spark.sql.kafka010
 import java.util.concurrent.{Executors, TimeUnit}
 
 import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.concurrent.duration.Duration
 import scala.util.Random
 
-import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.ConsumerConfig._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.util.ThreadUtils
 
 class KafkaDataConsumerSuite extends SharedSparkSession with 
PrivateMethodTester {
 
   protected var testUtils: KafkaTestUtils = _
+  private val topic = "topic" + Random.nextInt()
+  private val topicPartition = new TopicPartition(topic, 0)
+  private val groupId = "groupId"
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -51,6 +52,15 @@ class KafkaDataConsumerSuite extends SharedSparkSession with 
PrivateMethodTester
     super.afterAll()
   }
 
+  private def getKafkaParams() = Map[String, Object](
+    GROUP_ID_CONFIG -> "groupId",
+    BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
+    KEY_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName,
+    VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName,
+    AUTO_OFFSET_RESET_CONFIG -> "earliest",
+    ENABLE_AUTO_COMMIT_CONFIG -> "false"
+  ).asJava
+
   test("SPARK-19886: Report error cause correctly in reportDataLoss") {
     val cause = new Exception("D'oh!")
     val reportDataLoss = PrivateMethod[Unit]('reportDataLoss0)
@@ -60,23 +70,40 @@ class KafkaDataConsumerSuite extends SharedSparkSession 
with PrivateMethodTester
     assert(e.getCause === cause)
   }
 
+  test("new KafkaDataConsumer instance in case of Task retry") {
+    try {
+      KafkaDataConsumer.cache.clear()
+
+      val kafkaParams = getKafkaParams()
+      val key = new CacheKey(groupId, topicPartition)
+
+      val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
+      TaskContext.setTaskContext(context1)
+      val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams, 
true)
+      consumer1.release()
+
+      assert(KafkaDataConsumer.cache.size() == 1)
+      assert(KafkaDataConsumer.cache.get(key).eq(consumer1.internalConsumer))
+
+      val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null)
+      TaskContext.setTaskContext(context2)
+      val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams, 
true)
+      consumer2.release()
+
+      // The first consumer should be removed from cache and new non-cached 
should be returned
+      assert(KafkaDataConsumer.cache.size() == 0)
+      assert(consumer1.internalConsumer.ne(consumer2.internalConsumer))
+    } finally {
+      TaskContext.unset()
+    }
+  }
+
   test("SPARK-23623: concurrent use of KafkaDataConsumer") {
-    val topic = "topic" + Random.nextInt()
     val data = (1 to 1000).map(_.toString)
     testUtils.createTopic(topic, 1)
     testUtils.sendMessages(topic, data.toArray)
-    val topicPartition = new TopicPartition(topic, 0)
-
-    import ConsumerConfig._
-    val kafkaParams = Map[String, Object](
-      GROUP_ID_CONFIG -> "groupId",
-      BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
-      KEY_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName,
-      VALUE_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
-      AUTO_OFFSET_RESET_CONFIG -> "earliest",
-      ENABLE_AUTO_COMMIT_CONFIG -> "false"
-    )
 
+    val kafkaParams = getKafkaParams()
     val numThreads = 100
     val numConsumerUsages = 500
 
@@ -90,8 +117,7 @@ class KafkaDataConsumerSuite extends SharedSparkSession with 
PrivateMethodTester
         null
       }
       TaskContext.setTaskContext(taskContext)
-      val consumer = KafkaDataConsumer.acquire(
-        topicPartition, kafkaParams.asJava, useCache)
+      val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, 
useCache)
       try {
         val range = consumer.getAvailableOffsetRange()
         val rcvd = range.earliest until range.latest map { offset =>
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
index d8df549..431473e 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
@@ -87,6 +87,30 @@ class KafkaDataConsumerSuite extends SparkFunSuite with 
MockitoSugar with Before
     assert(existingInternalConsumer.eq(consumer2.internalConsumer))
   }
 
+  test("new KafkaDataConsumer instance in case of Task retry") {
+    KafkaDataConsumer.cache.clear()
+
+    val kafkaParams = getKafkaParams()
+    val key = new CacheKey(groupId, topicPartition)
+
+    val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
+    val consumer1 = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
+      topicPartition, kafkaParams, context1, true)
+    consumer1.release()
+
+    assert(KafkaDataConsumer.cache.size() == 1)
+    assert(KafkaDataConsumer.cache.get(key).eq(consumer1.internalConsumer))
+
+    val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null)
+    val consumer2 = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
+      topicPartition, kafkaParams, context2, true)
+    consumer2.release()
+
+    // The first consumer should be removed from cache and new non-cached 
should be returned
+    assert(KafkaDataConsumer.cache.size() == 0)
+    assert(consumer1.internalConsumer.ne(consumer2.internalConsumer))
+  }
+
   test("concurrent use of KafkaDataConsumer") {
     val data = (1 to 1000).map(_.toString)
     testUtils.createTopic(topic)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to