Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r215313215 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala --- @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.test.SharedSQLContext + +class FetchedPoolSuite extends SharedSQLContext { + type Record = ConsumerRecord[Array[Byte], Array[Byte]] + + private val dummyBytes = "dummy".getBytes + + test("acquire fresh one") { + val dataPool = FetchedDataPool.build + + val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + + assert(dataPool.getCache.get(cacheKey).isEmpty) + + val data = dataPool.acquire(cacheKey, 0) + + assert(dataPool.getCache(cacheKey).size === 1) + assert(dataPool.getCache(cacheKey).head.inUse) + + data.withNewPoll(testRecords(0, 5).listIterator, 5) + + dataPool.release(cacheKey, data) + + assert(dataPool.getCache(cacheKey).size === 1) + assert(!dataPool.getCache(cacheKey).head.inUse) + + dataPool.shutdown() + } + + test("acquire fetched data from multiple keys") { + val dataPool = FetchedDataPool.build + + val cacheKeys = (0 to 10).map { partId => + CacheKey("testgroup", new TopicPartition("topic", partId)) + } + + assert(dataPool.getCache.size === 0) + cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) } + + val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0))) + + assert(dataPool.getCache.size === cacheKeys.size) + cacheKeys.map { key => + assert(dataPool.getCache(key).size === 1) + assert(dataPool.getCache(key).head.inUse) + } + + dataList.map { case (_, data) => + data.withNewPoll(testRecords(0, 5).listIterator, 5) + } + + dataList.foreach { case (key, data) => + dataPool.release(key, data) + } + + assert(dataPool.getCache.size === cacheKeys.size) + cacheKeys.map { key => + assert(dataPool.getCache(key).size === 1) + assert(!dataPool.getCache(key).head.inUse) + } + + dataPool.shutdown() + } + + test("continuous use of fetched data from single key") { + val dataPool = FetchedDataPool.build + + val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + + assert(dataPool.getCache.get(cacheKey).isEmpty) + + val data = dataPool.acquire(cacheKey, 0) + + assert(dataPool.getCache(cacheKey).size === 1) + assert(dataPool.getCache(cacheKey).head.inUse) + + data.withNewPoll(testRecords(0, 5).listIterator, 5) + + (0 to 3).foreach { _ => data.next() } + + dataPool.release(cacheKey, data) + + // suppose next batch + + val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData) + + assert(data.eq(data2)) + + assert(dataPool.getCache(cacheKey).size === 1) + assert(dataPool.getCache(cacheKey).head.inUse) + + dataPool.release(cacheKey, data2) + + assert(dataPool.getCache(cacheKey).size === 1) + assert(!dataPool.getCache(cacheKey).head.inUse) + + dataPool.shutdown() + } + + test("multiple tasks referring same key continuously using fetched data") { + val dataPool = FetchedDataPool.build + + val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + + assert(dataPool.getCache.get(cacheKey).isEmpty) + + val dataFromTask1 = dataPool.acquire(cacheKey, 0) + + assert(dataPool.getCache(cacheKey).size === 1) + assert(dataPool.getCache(cacheKey).head.inUse) + + val dataFromTask2 = dataPool.acquire(cacheKey, 0) + + // it shouldn't give same object as dataFromTask1 though it asks same offset + // it definitely works when offsets are not overlapped: skip adding test for that + assert(dataPool.getCache(cacheKey).size === 2) + assert(dataPool.getCache(cacheKey)(1).inUse) + + // reading from task 1 + dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5) + + (0 to 3).foreach { _ => dataFromTask1.next() } + + dataPool.release(cacheKey, dataFromTask1) + + // reading from task 2 + dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30) + + (0 to 5).foreach { _ => dataFromTask2.next() } + + dataPool.release(cacheKey, dataFromTask2) + + // suppose next batch for task 1 + val data2FromTask1 = dataPool.acquire(cacheKey, dataFromTask1.nextOffsetInFetchedData) + assert(data2FromTask1.eq(dataFromTask1)) + + assert(dataPool.getCache(cacheKey).head.inUse) + + // suppose next batch for task 2 + val data2FromTask2 = dataPool.acquire(cacheKey, dataFromTask2.nextOffsetInFetchedData) + assert(data2FromTask2.eq(dataFromTask2)) + + assert(dataPool.getCache(cacheKey)(1).inUse) + + // release from task 2 + dataPool.release(cacheKey, data2FromTask2) + assert(!dataPool.getCache(cacheKey)(1).inUse) + + // release from task 1 + dataPool.release(cacheKey, data2FromTask1) + assert(!dataPool.getCache(cacheKey).head.inUse) + + dataPool.shutdown() + } + + test("evict idle fetched data") { --- End diff -- To make clear once again, was your finding a false alarm, or there's something to fix which I can reproduce the issue easily?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org