[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2018-08-17 Thread ScrapCodes
Github user ScrapCodes closed the pull request at:

https://github.com/apache/spark/pull/18143


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2018-01-17 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r162045340
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -45,9 +46,6 @@ private[kafka010] case class CachedKafkaConsumer private(
 
   private var consumer = createConsumer
 
-  /** indicates whether this consumer is in use or not */
-  private var inuse = true
--- End diff --

Now that we have moved to using object pools, this tracking is no longer 
required.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-06-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119636878
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
-if (null == cache) {
-  logInfo(s"Initializing cache $initialCapacity $maxCapacity 
$loadFactor")
-  cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
-initialCapacity, loadFactor, true) {
-override def removeEldestEntry(
-  entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): 
Boolean = {
-  if (this.size > maxCapacity) {
-try {
-  entry.getValue.consumer.close()
-} catch {
-  case x: KafkaException =>
-logError("Error closing oldest Kafka consumer", x)
-}
-true
-  } else {
-false
+if (cache == null) {
+  val duration =
+
SparkEnv.get.conf.getTimeAsMs("spark.streaming.kafkaConsumerCache.timeout", 
"30m")
--- End diff --

All of the other configs in the dstream are of the form 
"spark.streaming.kafka.consumer.cache.something", not sure whether it's better 
to be consistent with the sql config, or with the dstream config


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-06-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119636950
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
--- End diff --

Load factor is being ignored at this point, yes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-06-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119636360
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,42 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
-if (null == cache) {
-  logInfo(s"Initializing cache $initialCapacity $maxCapacity 
$loadFactor")
-  cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
-initialCapacity, loadFactor, true) {
-override def removeEldestEntry(
-  entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): 
Boolean = {
-  if (this.size > maxCapacity) {
-try {
-  entry.getValue.consumer.close()
-} catch {
-  case x: KafkaException =>
-logError("Error closing oldest Kafka consumer", x)
-}
-true
-  } else {
-false
+if (cache == null) {
+  val duration =
+
SparkEnv.get.conf.getTimeAsMs("spark.streaming.kafkaConsumerCache.timeout", 
"30m")
+  val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer[_, _]] {
+override def onRemoval(
+n: RemovalNotification[CacheKey, CachedKafkaConsumer[_, _]]): 
Unit = {
+  n.getCause match {
+case RemovalCause.SIZE =>
+  logWarning(
+s"Evicting consumer ${n.getKey}," +
+  s" due to size limit reached. Capacity: $maxCapacity.")
+case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: 
${n.getCause}")
+  }
+  try {
+n.getValue.close()
+  } catch {
+case NonFatal(e) =>
+  logWarning(s"Error in closing Kafka consumer: ${n.getKey}," +
+s" evicted from cache due to ${n.getCause}", e)
   }
 }
   }
+
+  cache = CacheBuilder.newBuilder()
+.maximumSize(maxCapacity).removalListener(removalListener)
--- End diff --

initialCapacity is being passed in, but not used, this should probably set 
initial capacity as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-06-01 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119582631
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -45,9 +46,6 @@ private[kafka010] case class CachedKafkaConsumer private(
 
   private var consumer = createConsumer
 
-  /** indicates whether this consumer is in use or not */
-  private var inuse = true
--- End diff --

Thanks, good point !, 
According to guava cache docs, When cache "approaches" the size limit - 
"The cache will try to evict entries that haven't been used recently or very 
often. " 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-06-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119562656
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -45,9 +46,6 @@ private[kafka010] case class CachedKafkaConsumer private(
 
   private var consumer = createConsumer
 
-  /** indicates whether this consumer is in use or not */
-  private var inuse = true
--- End diff --

Don't we need to track this anymore? Will we evict a consumer in use?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119374298
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -147,20 +155,14 @@ object CachedKafkaConsumer extends Logging {
   groupId: String,
   topic: String,
   partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
-CachedKafkaConsumer.synchronized {
+  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = {
   val k = CacheKey(groupId, topic, partition)
-  val v = cache.get(k)
-  if (null == v) {
-logInfo(s"Cache miss for $k")
-logDebug(cache.keySet.toString)
-val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, 
kafkaParams)
-cache.put(k, c)
-c
-  } else {
-// any given topicpartition should have a consistent key and value 
type
-v.asInstanceOf[CachedKafkaConsumer[K, V]]
-  }
+  val v = cache.get(k, new Callable[CachedKafkaConsumer[_, _]] {
+override def call(): CachedKafkaConsumer[K, V] = {
+  new CachedKafkaConsumer[K, V](groupId, topic, partition, 
kafkaParams)
--- End diff --

I think it's worth keeping the info / debug level logging for when a cache 
miss has happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119371920
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,38 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
-if (null == cache) {
-  logInfo(s"Initializing cache $initialCapacity $maxCapacity 
$loadFactor")
-  cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
-initialCapacity, loadFactor, true) {
-override def removeEldestEntry(
-  entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): 
Boolean = {
-  if (this.size > maxCapacity) {
-try {
-  entry.getValue.consumer.close()
-} catch {
-  case x: KafkaException =>
-logError("Error closing oldest Kafka consumer", x)
-}
-true
-  } else {
-false
-  }
+val duration = 
SparkEnv.get.conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "30m")
+val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer[_, _]] {
+  override def onRemoval(n: RemovalNotification[CacheKey, 
CachedKafkaConsumer[_, _]]): Unit = {
+n.getCause match {
+  case RemovalCause.SIZE =>
+logWarning(
+  s"Evicting consumer ${n.getKey}, due to size limit reached. 
Capacity: $maxCapacity.")
+  case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: 
${n.getCause}")
+}
+try {
+  n.getValue.close()
+} catch {
+  case NonFatal(e) =>
+logWarning(s"Error in closing Kafka consumer: ${n.getKey}," +
+  s" evicted from cache due to ${n.getCause}", e)
 }
   }
 }
+if (cache == null) {
--- End diff --

Shouldn't the whole function be guarded by this if statement?  Is there any 
reason to construct a removal listener otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119371285
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -109,34 +113,38 @@ object CachedKafkaConsumer extends Logging {
 
   private case class CacheKey(groupId: String, topic: String, partition: 
Int)
 
-  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
-  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+  private var cache: Cache[CacheKey, CachedKafkaConsumer[_, _]] = null
 
   /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored */
   def init(
   initialCapacity: Int,
   maxCapacity: Int,
   loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
-if (null == cache) {
-  logInfo(s"Initializing cache $initialCapacity $maxCapacity 
$loadFactor")
-  cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
-initialCapacity, loadFactor, true) {
-override def removeEldestEntry(
-  entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): 
Boolean = {
-  if (this.size > maxCapacity) {
-try {
-  entry.getValue.consumer.close()
-} catch {
-  case x: KafkaException =>
-logError("Error closing oldest Kafka consumer", x)
-}
-true
-  } else {
-false
-  }
+val duration = 
SparkEnv.get.conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "30m")
--- End diff --

I don't think this should be using configuration from the spark.sql 
namespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-31 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119277567
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
--- End diff --

This is used later to construct warning log message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119132090
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
+val capacity = conf.getInt(capacityConfigString, 64)
+val duration = 
conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
--- End diff --

It seems like the general trend towards default configuration values is to 
make them work for the lowest common denominator use case, in which case I'd 
argue for a longer (30 min?) default timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119131006
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
+val capacity = conf.getInt(capacityConfigString, 64)
+val duration = 
conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
+val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer] {
+  override def onRemoval(n: RemovalNotification[CacheKey, 
CachedKafkaConsumer]): Unit = {
+n.getCause match {
+  case RemovalCause.SIZE =>
+logWarning(
--- End diff --

It shouldn't really be a normal operation.  If capacity is smaller than the 
number of partitions that are regularly being assigned to a given node, it's 
going to kill performance due to recreating consumers every batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119114508
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
--- End diff --

Why declare a type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119116101
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
+val capacity = conf.getInt(capacityConfigString, 64)
+val duration = 
conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
+val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer] {
+  override def onRemoval(n: RemovalNotification[CacheKey, 
CachedKafkaConsumer]): Unit = {
+n.getCause match {
+  case RemovalCause.SIZE =>
+logWarning(
+  s"""
+ |Evicting consumer ${n.getKey}, due to size limit 
reached. Capacity: $capacity.
+ |This can be configured using $capacityConfigString.
+ |.stripMargin)
+  case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: 
${n.getCause}")
+}
+try {
+  n.getValue.close()
+} catch {
+  case NonFatal(e) =>
+logError(s"Error closing Kafka consumer: ${n.getKey}, evicted 
due to ${n.getCause}", e)
 }
   }
 }
-  }
-
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
 
-synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
+val guavaCache: Cache[CacheKey, CachedKafkaConsumer] =
--- End diff --

Why bother declaring this `val`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119116252
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
+val capacity = conf.getInt(capacityConfigString, 64)
+val duration = 
conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
+val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer] {
+  override def onRemoval(n: RemovalNotification[CacheKey, 
CachedKafkaConsumer]): Unit = {
+n.getCause match {
+  case RemovalCause.SIZE =>
+logWarning(
+  s"""
+ |Evicting consumer ${n.getKey}, due to size limit 
reached. Capacity: $capacity.
+ |This can be configured using $capacityConfigString.
+ |.stripMargin)
+  case _ => logDebug(s"Evicting consumer ${n.getKey}, cause: 
${n.getCause}")
+}
+try {
+  n.getValue.close()
+} catch {
+  case NonFatal(e) =>
+logError(s"Error closing Kafka consumer: ${n.getKey}, evicted 
due to ${n.getCause}", e)
--- End diff --

On the flip side, this isn't really an error -- you can recover by just 
continuing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119114480
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -310,62 +308,45 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
   private lazy val cache = {
 val conf = SparkEnv.get.conf
-val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val capacityConfigString: String = 
"spark.sql.kafkaConsumerCache.capacity"
+val capacity = conf.getInt(capacityConfigString, 64)
+val duration = 
conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
+val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer] {
+  override def onRemoval(n: RemovalNotification[CacheKey, 
CachedKafkaConsumer]): Unit = {
+n.getCause match {
+  case RemovalCause.SIZE =>
+logWarning(
--- End diff --

I still don't see why this is a warning? it's normal operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119088849
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -18,19 +18,19 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.concurrent.TimeoutException
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.{Callable, TimeoutException, TimeUnit}
 
+import com.google.common.cache._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
--- End diff --

Got it, you mean scala imports come right after java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119085683
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -383,19 +362,16 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 // If this is reattempt at running the task, then invalidate cache and 
start with
 // a new consumer
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
-  cache.put(key, consumer)
-  consumer
-} else {
-  if (!cache.containsKey(key)) {
-cache.put(key, new CachedKafkaConsumer(topicPartition, 
kafkaParams))
-  }
-  val consumer = cache.get(key)
-  consumer.inuse = true
-  consumer
+  cache.invalidate(key)
 }
+
+val consumer = cache.get(key, new Callable[CachedKafkaConsumer] {
--- End diff --

AFAIK, this is possible scala 2.12 onwards. 
[reference](http://www.scala-lang.org/news/2.12.0#lambda-syntax-for-sam-types)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119084772
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -18,19 +18,19 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.concurrent.TimeoutException
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.{Callable, TimeoutException, TimeUnit}
 
+import com.google.common.cache._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
--- End diff --

Thanks @srowen for taking a look, I am a bit unsure, how? 
BTW, is this not fully covered by our scalastyle check ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119073041
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -311,61 +311,40 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
   private lazy val cache = {
 val conf = SparkEnv.get.conf
 val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
-new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, 
true) {
-  override def removeEldestEntry(
-entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
-if (entry.getValue.inuse == false && this.size > capacity) {
-  logWarning(s"KafkaConsumer cache hitting max capacity of 
$capacity, " +
-s"removing consumer for ${entry.getKey}")
-  try {
-entry.getValue.close()
-  } catch {
-case e: SparkException =>
-  logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-  }
-  true
-} else {
-  false
+val duration = 
conf.getTimeAsMs("spark.sql.kafkaConsumerCache.timeout", "10m")
+val removalListener = new RemovalListener[CacheKey, 
CachedKafkaConsumer] {
+  override def onRemoval(n: RemovalNotification[CacheKey, 
CachedKafkaConsumer]): Unit = {
+val logMsg: String = s"Evicting consumer ${n.getKey}, cause: 
${n.getCause}"
+n.getCause match {
+  case RemovalCause.SIZE => logWarning(logMsg)
--- End diff --

Why is removing for size a warning?
You probably want different messages then, and, if so, should put the 
pattern string in the invocation. Otherwise you're always constructing the 
message even in the common path where the debug message is ignored


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119073318
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -383,19 +362,16 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 // If this is reattempt at running the task, then invalidate cache and 
start with
 // a new consumer
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
-  cache.put(key, consumer)
-  consumer
-} else {
-  if (!cache.containsKey(key)) {
-cache.put(key, new CachedKafkaConsumer(topicPartition, 
kafkaParams))
-  }
-  val consumer = cache.get(key)
-  consumer.inuse = true
-  consumer
+  cache.invalidate(key)
 }
+
+val consumer = cache.get(key, new Callable[CachedKafkaConsumer] {
--- End diff --

I'm probably wrong about this, but does Scala let you write a simple lambda 
here? because it's an interface with just one method. Might be clearer if so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119073484
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -383,19 +362,16 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 // If this is reattempt at running the task, then invalidate cache and 
start with
 // a new consumer
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
-  cache.put(key, consumer)
-  consumer
-} else {
-  if (!cache.containsKey(key)) {
-cache.put(key, new CachedKafkaConsumer(topicPartition, 
kafkaParams))
-  }
-  val consumer = cache.get(key)
-  consumer.inuse = true
-  consumer
+  cache.invalidate(key)
 }
+
+val consumer = cache.get(key, new Callable[CachedKafkaConsumer] {
+  override def call(): CachedKafkaConsumer = {
+new CachedKafkaConsumer(topicPartition, kafkaParams)
+  }
+})
+consumer.inuse = true
--- End diff --

I think there's a race condition now, because modifying inuse isn't done 
atomically anymore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18143: [SPARK-20919][SS] Simplificaiton of CachedKafkaCo...

2017-05-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18143#discussion_r119072852
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -18,19 +18,19 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.concurrent.TimeoutException
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.{Callable, TimeoutException, TimeUnit}
 
+import com.google.common.cache._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
--- End diff --

These are missorted now -- the imports shouldn't move


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org