Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22138#discussion_r215313215
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
    @@ -0,0 +1,299 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecord
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class FetchedPoolSuite extends SharedSQLContext {
    +  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
    +
    +  private val dummyBytes = "dummy".getBytes
    +
    +  test("acquire fresh one") {
    +    val dataPool = FetchedDataPool.build
    +
    +    val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
    +
    +    assert(dataPool.getCache.get(cacheKey).isEmpty)
    +
    +    val data = dataPool.acquire(cacheKey, 0)
    +
    +    assert(dataPool.getCache(cacheKey).size === 1)
    +    assert(dataPool.getCache(cacheKey).head.inUse)
    +
    +    data.withNewPoll(testRecords(0, 5).listIterator, 5)
    +
    +    dataPool.release(cacheKey, data)
    +
    +    assert(dataPool.getCache(cacheKey).size === 1)
    +    assert(!dataPool.getCache(cacheKey).head.inUse)
    +
    +    dataPool.shutdown()
    +  }
    +
    +  test("acquire fetched data from multiple keys") {
    +    val dataPool = FetchedDataPool.build
    +
    +    val cacheKeys = (0 to 10).map { partId =>
    +      CacheKey("testgroup", new TopicPartition("topic", partId))
    +    }
    +
    +    assert(dataPool.getCache.size === 0)
    +    cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
    +
    +    val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
    +
    +    assert(dataPool.getCache.size === cacheKeys.size)
    +    cacheKeys.map { key =>
    +      assert(dataPool.getCache(key).size === 1)
    +      assert(dataPool.getCache(key).head.inUse)
    +    }
    +
    +    dataList.map { case (_, data) =>
    +      data.withNewPoll(testRecords(0, 5).listIterator, 5)
    +    }
    +
    +    dataList.foreach { case (key, data) =>
    +      dataPool.release(key, data)
    +    }
    +
    +    assert(dataPool.getCache.size === cacheKeys.size)
    +    cacheKeys.map { key =>
    +      assert(dataPool.getCache(key).size === 1)
    +      assert(!dataPool.getCache(key).head.inUse)
    +    }
    +
    +    dataPool.shutdown()
    +  }
    +
    +  test("continuous use of fetched data from single key") {
    +    val dataPool = FetchedDataPool.build
    +
    +    val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
    +
    +    assert(dataPool.getCache.get(cacheKey).isEmpty)
    +
    +    val data = dataPool.acquire(cacheKey, 0)
    +
    +    assert(dataPool.getCache(cacheKey).size === 1)
    +    assert(dataPool.getCache(cacheKey).head.inUse)
    +
    +    data.withNewPoll(testRecords(0, 5).listIterator, 5)
    +
    +    (0 to 3).foreach { _ => data.next() }
    +
    +    dataPool.release(cacheKey, data)
    +
    +    // suppose next batch
    +
    +    val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
    +
    +    assert(data.eq(data2))
    +
    +    assert(dataPool.getCache(cacheKey).size === 1)
    +    assert(dataPool.getCache(cacheKey).head.inUse)
    +
    +    dataPool.release(cacheKey, data2)
    +
    +    assert(dataPool.getCache(cacheKey).size === 1)
    +    assert(!dataPool.getCache(cacheKey).head.inUse)
    +
    +    dataPool.shutdown()
    +  }
    +
    +  test("multiple tasks referring same key continuously using fetched 
data") {
    +    val dataPool = FetchedDataPool.build
    +
    +    val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
    +
    +    assert(dataPool.getCache.get(cacheKey).isEmpty)
    +
    +    val dataFromTask1 = dataPool.acquire(cacheKey, 0)
    +
    +    assert(dataPool.getCache(cacheKey).size === 1)
    +    assert(dataPool.getCache(cacheKey).head.inUse)
    +
    +    val dataFromTask2 = dataPool.acquire(cacheKey, 0)
    +
    +    // it shouldn't give same object as dataFromTask1 though it asks same 
offset
    +    // it definitely works when offsets are not overlapped: skip adding 
test for that
    +    assert(dataPool.getCache(cacheKey).size === 2)
    +    assert(dataPool.getCache(cacheKey)(1).inUse)
    +
    +    // reading from task 1
    +    dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5)
    +
    +    (0 to 3).foreach { _ => dataFromTask1.next() }
    +
    +    dataPool.release(cacheKey, dataFromTask1)
    +
    +    // reading from task 2
    +    dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30)
    +
    +    (0 to 5).foreach { _ => dataFromTask2.next() }
    +
    +    dataPool.release(cacheKey, dataFromTask2)
    +
    +    // suppose next batch for task 1
    +    val data2FromTask1 = dataPool.acquire(cacheKey, 
dataFromTask1.nextOffsetInFetchedData)
    +    assert(data2FromTask1.eq(dataFromTask1))
    +
    +    assert(dataPool.getCache(cacheKey).head.inUse)
    +
    +    // suppose next batch for task 2
    +    val data2FromTask2 = dataPool.acquire(cacheKey, 
dataFromTask2.nextOffsetInFetchedData)
    +    assert(data2FromTask2.eq(dataFromTask2))
    +
    +    assert(dataPool.getCache(cacheKey)(1).inUse)
    +
    +    // release from task 2
    +    dataPool.release(cacheKey, data2FromTask2)
    +    assert(!dataPool.getCache(cacheKey)(1).inUse)
    +
    +    // release from task 1
    +    dataPool.release(cacheKey, data2FromTask1)
    +    assert(!dataPool.getCache(cacheKey).head.inUse)
    +
    +    dataPool.shutdown()
    +  }
    +
    +  test("evict idle fetched data") {
    --- End diff --
    
    To make clear once again, was your finding a false alarm, or there's 
something to fix which I can reproduce the issue easily?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to