[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-17 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018501#comment-17018501
 ] 

John Roesler commented on KAFKA-9450:
-

This reminds me of an idea I proposed a while back, and haven't been able to 
let go of. I think it's buried in a Jira ticket somewhere.

One practical complexity with flushing is to guarantee that persistent stores 
actually do a fs sync call before we write the checkpoint file, which in turn 
is to guarantee that in a crash-recovery scenario, the offset in the recovered 
checkpoint file is always equal to or before the state of the recovered store.

The same goal could be accomplished without any filesystem intricacies if we 
store the offset in the same store as the data. Think: either a reserved key, 
or a separate column family. This would allow the underlying store to ensure 
the order of data updates with respect to changelog offset updates on its own 
(using its internal translog or whatever).

Anyway, I bring this up right now because offhand, I can't think of any reason 
we'd actually need to flush bytes stores at all if we did things that way.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:26 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:22 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.
```
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
```

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:21 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.
```
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
```


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> 

[jira] [Commented] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2020-01-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018460#comment-17018460
 ] 

Matthias J. Sax commented on KAFKA-7658:


Yes, it's ok to pick this up and assign to yourself. The KIP was accepted 
already, hence, it's just a matter to implement it. [~ash26389] did go silent 
so I assume she is not interested in this any longer.

> Add KStream#toTable to the Streams DSL
> --
>
> Key: KAFKA-7658
> URL: https://issues.apache.org/jira/browse/KAFKA-7658
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Aishwarya Pradeep Kumar
>Priority: Major
>  Labels: kip, newbie
>
> KIP-523: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL]
>  
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code:java}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference:
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9440) Add ConsumerGroupCommand to delete static members

2020-01-17 Thread Xue Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018448#comment-17018448
 ] 

Xue Liu commented on KAFKA-9440:


Hi [~bchen225242], I am pretty new to Kafka community, can I take up this? My 
team is using static membership to avoid excessive re-balancing. 

> Add ConsumerGroupCommand to delete static members
> -
>
> Key: KAFKA-9440
> URL: https://issues.apache.org/jira/browse/KAFKA-9440
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, kip, newbie, newbie++
>
> We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It 
> would be good to instantiate the API as part of the ConsumerGroupCommand for 
> easy command line usage. 
> This change requires a new KIP, and just posting out here in case anyone who 
> uses static membership to pick it up, if they would like to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2020-01-17 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018432#comment-17018432
 ] 

highluck commented on KAFKA-7658:
-


I wanted to say
Is it okay if I pr?!

> Add KStream#toTable to the Streams DSL
> --
>
> Key: KAFKA-7658
> URL: https://issues.apache.org/jira/browse/KAFKA-7658
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Aishwarya Pradeep Kumar
>Priority: Major
>  Labels: kip, newbie
>
> KIP-523: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL]
>  
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code:java}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference:
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5381) ERROR Uncaught exception in scheduled task 'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)

2020-01-17 Thread mjuarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018380#comment-17018380
 ] 

mjuarez commented on KAFKA-5381:


We ran into the same exception with kafka version 1.1.1.
{noformat}
[2020-01-17 02:33:55,203] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.49.6:58884-36452 (kafka.network.Processor)
[2020-01-17 02:33:55,203] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.52.52:37878-36458 (kafka.network.Processor)
[2020-01-17 02:33:55,203] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.37.248:43442-36340 (kafka.network.Processor)
[2020-01-17 02:33:55,206] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.16.9:54542-35541 (kafka.network.Processor)
[2020-01-17 02:33:55,207] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.36.83:53798-36337 (kafka.network.Processor)
[2020-01-17 02:33:55,215] INFO [Partition sample_topic_error-9 broker=6] 
Shrinking ISR from 6,2,1 to 6 (kafka.cluster.Partition)
[2020-01-17 02:33:55,216] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.43.25:47680-36360 (kafka.network.Processor)
[2020-01-17 02:33:55,220] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.36.38:53360-36328 (kafka.network.Processor)
[2020-01-17 02:33:55,240] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.51.81:38058-36536 (kafka.network.Processor)
[2020-01-17 02:33:55,240] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.41.225:49834-36329 (kafka.network.Processor)
[2020-01-17 02:33:55,283] INFO [GroupCoordinator 6]: Preparing to rebalance 
group light_group_v3 with old generation 226 (__consumer_offsets-32) 
(kafka.coordinator.group.GroupCoordinator)
[2020-01-17 02:33:55,296] INFO [ZooKeeperClient] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)
[2020-01-17 02:33:55,296] INFO [ZooKeeperClient] Connected. 
(kafka.zookeeper.ZooKeeperClient)
[2020-01-17 02:33:55,797] INFO [GroupCoordinator 6]: Preparing to rebalance 
group analytics_completed with old generation 394 (__consumer_offsets-47) 
(kafka.coordinator.group.GroupCoordinator)
[2020-01-17 02:33:55,913] INFO [ZooKeeperClient] Session expired. 
(kafka.zookeeper.ZooKeeperClient)
[2020-01-17 02:33:55,915] INFO [ZooKeeperClient] Initializing a new session to 
prod-zk1.prod.core.company.org:2181,prod-zk2.prod.core.company.org:2181,prod-zk3.prod.core.company.org:2181.
 (kafka.zookeeper.ZooKeeperClient)
[2020-01-17 02:33:55,916] INFO Creating /brokers/ids/6 (is it secure? false) 
(kafka.zk.KafkaZkClient)
[2020-01-17 02:33:55,921] INFO Processing notification(s) to /config/changes 
(kafka.common.ZkNodeChangeNotificationListener)
[2020-01-17 02:33:55,922] INFO Result of znode creation at /brokers/ids/6 is: 
OK (kafka.zk.KafkaZkClient)
[2020-01-17 02:33:55,922] INFO Registered broker 6 at path /brokers/ids/6 with 
addresses: 
ArrayBuffer(EndPoint(prod-kafka6.prod.core.company.org,9092,ListenerName(PLAINTEXT),PLAINTEXT))
 (kafka.zk.KafkaZkClient)
[2020-01-17 02:33:56,016] ERROR Uncaught exception in scheduled task 
'isr-expiration' (kafka.utils.KafkaScheduler)
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired for /brokers/topics/sample_topic_error/partitions/9/state
at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:487)
at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:631)
at 
kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
at 
kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:727)
at kafka.cluster.Partition$$anonfun$2.apply$mcZ$sp(Partition.scala:545)
at kafka.cluster.Partition$$anonfun$2.apply(Partition.scala:536)
at kafka.cluster.Partition$$anonfun$2.apply(Partition.scala:536)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:535)
at 
kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1330)
at 

[jira] [Updated] (KAFKA-9451) Pass consumer group metadata to producer on commit

2020-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9451:
---
Description: Using producer per thread EOS design, we need to pass the 
consumer group metadata into `producer.sendOffsetsToTransaction()` to use the 
new consumer group coordinator fenchning mechanism. We should also reduce the 
default transaction timeout to 10 seconds (compare the KIP for details).  (was: 
Using producer per thread EOS design, we need to pass the consumer group 
metadata into `producer.sendOffsetsToTransaction().)

> Pass consumer group metadata to producer on commit
> --
>
> Key: KAFKA-9451
> URL: https://issues.apache.org/jira/browse/KAFKA-9451
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> Using producer per thread EOS design, we need to pass the consumer group 
> metadata into `producer.sendOffsetsToTransaction()` to use the new consumer 
> group coordinator fenchning mechanism. We should also reduce the default 
> transaction timeout to 10 seconds (compare the KIP for details).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9451) Pass consumer group metadata to producer on commit

2020-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9451:
--

Assignee: Matthias J. Sax

> Pass consumer group metadata to producer on commit
> --
>
> Key: KAFKA-9451
> URL: https://issues.apache.org/jira/browse/KAFKA-9451
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> Using producer per thread EOS design, we need to pass the consumer group 
> metadata into `producer.sendOffsetsToTransaction().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9451) Pass consumer group metadata to producer on commit

2020-01-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9451:
--

 Summary: Pass consumer group metadata to producer on commit
 Key: KAFKA-9451
 URL: https://issues.apache.org/jira/browse/KAFKA-9451
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax


Using producer per thread EOS design, we need to pass the consumer group 
metadata into `producer.sendOffsetsToTransaction().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9338) Incremental fetch sessions do not maintain or use leader epoch for fencing purposes

2020-01-17 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9338.

Fix Version/s: 2.5.0
   Resolution: Fixed

Marking this just as 2.5 for now. If we don't find any problems, we will 
backport to 2.4 at least.

> Incremental fetch sessions do not maintain or use leader epoch for fencing 
> purposes
> ---
>
> Key: KAFKA-9338
> URL: https://issues.apache.org/jira/browse/KAFKA-9338
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.5.0
>
>
> KIP-320 adds the ability to fence replicas by detecting stale leader epochs 
> from followers, and helping consumers handle unclean truncation.
> Unfortunately the incremental fetch session handling does not maintain or use 
> the leader epoch in the fetch session cache. As a result, it does not appear 
> that the leader epoch is used for fencing a majority of the time. I'm not 
> sure if this is only the case after incremental fetch sessions are 
> established - it may be the case that the first "full" fetch session is safe.
> Optional.empty is returned for the FetchRequest.PartitionData here:
> [https://github.com/apache/kafka/blob/a4cbdc6a7b3140ccbcd0e2339e28c048b434974e/core/src/main/scala/kafka/server/FetchSession.scala#L111]
> I believe this affects brokers from 2.1.0 when fencing was improved on the 
> replica fetcher side, and 2.3.0 and above for consumers, which is when client 
> side truncation detection was added on the consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/17/20 10:42 PM:
-

Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute.
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for 

[jira] [Commented] (KAFKA-9338) Incremental fetch sessions do not maintain or use leader epoch for fencing purposes

2020-01-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018370#comment-17018370
 ] 

ASF GitHub Bot commented on KAFKA-9338:
---

hachikuji commented on pull request #7970: KAFKA-9338; Fetch session should 
cache request leader epoch
URL: https://github.com/apache/kafka/pull/7970
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Incremental fetch sessions do not maintain or use leader epoch for fencing 
> purposes
> ---
>
> Key: KAFKA-9338
> URL: https://issues.apache.org/jira/browse/KAFKA-9338
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jason Gustafson
>Priority: Major
>
> KIP-320 adds the ability to fence replicas by detecting stale leader epochs 
> from followers, and helping consumers handle unclean truncation.
> Unfortunately the incremental fetch session handling does not maintain or use 
> the leader epoch in the fetch session cache. As a result, it does not appear 
> that the leader epoch is used for fencing a majority of the time. I'm not 
> sure if this is only the case after incremental fetch sessions are 
> established - it may be the case that the first "full" fetch session is safe.
> Optional.empty is returned for the FetchRequest.PartitionData here:
> [https://github.com/apache/kafka/blob/a4cbdc6a7b3140ccbcd0e2339e28c048b434974e/core/src/main/scala/kafka/server/FetchSession.scala#L111]
> I believe this affects brokers from 2.1.0 when fencing was improved on the 
> replica fetcher side, and 2.3.0 and above for consumers, which is when client 
> side truncation detection was added on the consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9329) KafkaController::replicasAreValid should return error

2020-01-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018369#comment-17018369
 ] 

ASF GitHub Bot commented on KAFKA-9329:
---

hachikuji commented on pull request #7865: KAFKA-9329. 
KafkaController::replicasAreValid should return error
URL: https://github.com/apache/kafka/pull/7865
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaController::replicasAreValid should return error
> -
>
> Key: KAFKA-9329
> URL: https://issues.apache.org/jira/browse/KAFKA-9329
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
>
> The method currently returns a boolean indicating if replicas are valid or 
> not. But the failure condition loses any context on why replicas are not 
> valid. We should return the error condition along with success/failure.
> Maybe change method name to something like `validateReplicas` too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9329) KafkaController::replicasAreValid should return error

2020-01-17 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9329.

Fix Version/s: 2.5.0
   Resolution: Fixed

> KafkaController::replicasAreValid should return error
> -
>
> Key: KAFKA-9329
> URL: https://issues.apache.org/jira/browse/KAFKA-9329
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
> Fix For: 2.5.0
>
>
> The method currently returns a boolean indicating if replicas are valid or 
> not. But the failure condition loses any context on why replicas are not 
> valid. We should return the error condition along with success/failure.
> Maybe change method name to something like `validateReplicas` too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu commented on KAFKA-8532:
---

Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute.
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Resolved] (KAFKA-9449) Producer's BufferPool may block the producer from closing.

2020-01-17 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9449.

Fix Version/s: 2.4.1
   Resolution: Fixed

> Producer's BufferPool may block the producer from closing.
> --
>
> Key: KAFKA-9449
> URL: https://issues.apache.org/jira/browse/KAFKA-9449
> Project: Kafka
>  Issue Type: Bug
>Reporter: Brian Byrne
>Assignee: Brian Byrne
>Priority: Major
> Fix For: 2.4.1
>
>
> The producer's BufferPool may block allocations if its memory limit has hit 
> capacity. If the producer is closed, it's possible for the allocation waiters 
> to wait for max.block.ms if progress cannot be made, even when force-closed 
> (immediate), which can cause indefinite blocking if max.block.ms is 
> particularly high.
> The BufferPool should be made close-able, which should immediate wake up any 
> waiters that are pending allocations and throw a "producer is closing" 
> exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9449) Producer's BufferPool may block the producer from closing.

2020-01-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018364#comment-17018364
 ] 

ASF GitHub Bot commented on KAFKA-9449:
---

hachikuji commented on pull request #7967: KAFKA-9449: Adds support for closing 
the producer's BufferPool.
URL: https://github.com/apache/kafka/pull/7967
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Producer's BufferPool may block the producer from closing.
> --
>
> Key: KAFKA-9449
> URL: https://issues.apache.org/jira/browse/KAFKA-9449
> Project: Kafka
>  Issue Type: Bug
>Reporter: Brian Byrne
>Assignee: Brian Byrne
>Priority: Major
>
> The producer's BufferPool may block allocations if its memory limit has hit 
> capacity. If the producer is closed, it's possible for the allocation waiters 
> to wait for max.block.ms if progress cannot be made, even when force-closed 
> (immediate), which can cause indefinite blocking if max.block.ms is 
> particularly high.
> The BufferPool should be made close-able, which should immediate wake up any 
> waiters that are pending allocations and throw a "producer is closing" 
> exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018352#comment-17018352
 ] 

Ted Yu commented on KAFKA-8532:
---

Looking at KafkaController.scala in trunk, I don't see 
Expire.waitUntilProcessingStarted shown in the stack trace.
It seems the class has gone through refactoring / bug fix.

[~lbdai3190]
If you can attach server log, that may help us find the root cause.

Thanks

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018279#comment-17018279
 ] 

Jun Rao commented on KAFKA-8532:


[~lbdai3190]: Sorry for the late reply. Took a look at your last comment. If we 
actually hit an exception in ZooKeeperClient.handleRequests(), the caller won't 
be blocked on countDownLatch.await() since we won't even reach that code. So, 
Ted's PR does make the code cleaner, but I am not sure if that solves this 
particular problem. The main question is why ZooKeeperClient.handleRequests() 
blocks at countDownLatch.await(). At that point, the ZK session has expired. 
All pending calls on that expired ZK session should complete with a 
SessionExpired error through the response callback. 

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> 

[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-01-17 Thread Raman Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018259#comment-17018259
 ] 

Raman Gupta commented on KAFKA-8803:


And it happened again today with the same stream. Offsets were / are not 
expired so never mind my last theory.

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Boyang Chen
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-17 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9450:
--

 Summary: Decouple inner state flushing from committing with EOS
 Key: KAFKA-9450
 URL: https://issues.apache.org/jira/browse/KAFKA-9450
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sophie Blee-Goldman


When EOS is turned on, the commit interval is set quite low (100ms) and all the 
store layers are flushed during a commit. This is necessary for forwarding 
records in the cache to the changelog, but unfortunately also forces rocksdb to 
flush the current memtable before it's full. The result is a large number of 
small writes to disk, losing the benefits of batching, and a large number of 
very small L0 files that are likely to slow compaction.

Since we have to delete the stores to recreate from scratch anyways during an 
unclean shutdown with EOS, we may as well skip flushing the innermost 
StateStore during a commit and only do so during a graceful shutdown, before a 
rebalance, etc. This is currently blocked on a refactoring of the state store 
layers to allow decoupling the flush of the caching layer from the actual state 
store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9449) Producer's BufferPool may block the producer from closing.

2020-01-17 Thread Brian Byrne (Jira)
Brian Byrne created KAFKA-9449:
--

 Summary: Producer's BufferPool may block the producer from closing.
 Key: KAFKA-9449
 URL: https://issues.apache.org/jira/browse/KAFKA-9449
 Project: Kafka
  Issue Type: Bug
Reporter: Brian Byrne
Assignee: Brian Byrne


The producer's BufferPool may block allocations if its memory limit has hit 
capacity. If the producer is closed, it's possible for the allocation waiters 
to wait for max.block.ms if progress cannot be made, even when force-closed 
(immediate), which can cause indefinite blocking if max.block.ms is 
particularly high.

The BufferPool should be made close-able, which should immediate wake up any 
waiters that are pending allocations and throw a "producer is closing" 
exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Brian Byrne (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brian Byrne updated KAFKA-8532:
---
Priority: Blocker  (was: Major)

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
>  at 
> 

[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Brian Byrne (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brian Byrne updated KAFKA-8532:
---
Priority: Major  (was: Blocker)

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Major
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
>  at 
> 

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018175#comment-17018175
 ] 

Ted Yu commented on KAFKA-8532:
---

Created https://github.com/apache/kafka/pull/7978

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018152#comment-17018152
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/17/20 4:13 PM:


How about making the following change ?
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 7b995931f..6a0809e16 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String,
   inReadLock(initializationLock) {
 send(request) { response =>
   responseQueue.add(response)
-  inFlightRequests.release()
-  countDownLatch.countDown()
 }
   }
-} catch {
-  case e: Throwable =>
-inFlightRequests.release()
-throw e
+   } finally {
+  inFlightRequests.release()
+  countDownLatch.countDown()
 }
   }
   countDownLatch.await()
{code}
countDownLatch is handled consistently with inFlightRequests.

I have run through core:test which passed.

I can send out a PR.


was (Author: yuzhih...@gmail.com):
How about making the following change ?
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 7b995931f..6a0809e16 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String,
   inReadLock(initializationLock) {
 send(request) { response =>
   responseQueue.add(response)
-  inFlightRequests.release()
-  countDownLatch.countDown()
 }
   }
-} catch {
-  case e: Throwable =>
-inFlightRequests.release()
-throw e
+   } finally {
+  inFlightRequests.release()
+  countDownLatch.countDown()
 }
   }
   countDownLatch.await()
{code}
countDownLatch is handled consistently with inFlightRequests.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> 

[jira] [Commented] (KAFKA-2526) Console Producer / Consumer's serde config is not working

2020-01-17 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018099#comment-17018099
 ] 

Tom Bentley commented on KAFKA-2526:


[~mgharat] are you working on this, or intending to come back to it?

> Console Producer / Consumer's serde config is not working
> -
>
> Key: KAFKA-2526
> URL: https://issues.apache.org/jira/browse/KAFKA-2526
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: newbie
>
> Although in the console producer one can specify the key value serializer, 
> they are actually not used since 1) it always serialize the input string as 
> String.getBytes (hence always pre-assume the string serializer) and 2) it is 
> actually only passed into the old producer. The same issues exist in console 
> consumer.
> In addition the configs in the console producer is messy: we have 1) some 
> config values exposed as cmd parameters, and 2) some config values in 
> --producer-property and 3) some in --property.
> It will be great to clean the configs up in both console producer and 
> consumer, and put them into a single --property parameter which could 
> possibly take a file to reading in property values as well, and only leave 
> --new-producer as the other command line parameter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9218) MirrorMaker 2 can fail to create topics

2020-01-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-9218.
---
Fix Version/s: 2.5.0
   Resolution: Fixed

> MirrorMaker 2 can fail to create topics
> ---
>
> Key: KAFKA-9218
> URL: https://issues.apache.org/jira/browse/KAFKA-9218
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 2.5.0
>
>
> MirrorSourceConnector.refreshTopicPartitions() does not handle topic creation 
> failure correctly.
> If createTopicPartitions() fails to create a topic, the next time 
> refreshTopicPartitions() it will not retry the creation. The creation will 
> only be retried if another topic has been created in the source cluster. This 
> is because knownTopicPartitions is updated before the topic creation is 
> attempted and it's only refreshed if a new topic appears.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9218) MirrorMaker 2 can fail to create topics

2020-01-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018037#comment-17018037
 ] 

ASF GitHub Bot commented on KAFKA-9218:
---

mimaison commented on pull request #7745: KAFKA-9218: MirrorMaker 2 can fail to 
create topics
URL: https://github.com/apache/kafka/pull/7745
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MirrorMaker 2 can fail to create topics
> ---
>
> Key: KAFKA-9218
> URL: https://issues.apache.org/jira/browse/KAFKA-9218
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> MirrorSourceConnector.refreshTopicPartitions() does not handle topic creation 
> failure correctly.
> If createTopicPartitions() fails to create a topic, the next time 
> refreshTopicPartitions() it will not retry the creation. The creation will 
> only be retried if another topic has been created in the source cluster. This 
> is because knownTopicPartitions is updated before the topic creation is 
> attempted and it's only refreshed if a new topic appears.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8865) KIP-504: New Java Authorizer API

2020-01-17 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8865.
---
Fix Version/s: 2.4.0
   Resolution: Fixed

> KIP-504: New Java Authorizer API
> 
>
> Key: KAFKA-8865
> URL: https://issues.apache.org/jira/browse/KAFKA-8865
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.4.0
>
>
> Parent task for sub-tasks related to KIP-504



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8847) Deprecate and remove usage of supporting classes in kafka.security.auth

2020-01-17 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8847.
---
  Reviewer: Manikumar
Resolution: Fixed

> Deprecate and remove usage of supporting classes in kafka.security.auth
> ---
>
> Key: KAFKA-8847
> URL: https://issues.apache.org/jira/browse/KAFKA-8847
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.5.0
>
>
> Deprecate Acl, Resource etc. from `kafka.security.auth` and replace 
> references to these with the equivalent Java classes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8847) Deprecate and remove usage of supporting classes in kafka.security.auth

2020-01-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018010#comment-17018010
 ] 

ASF GitHub Bot commented on KAFKA-8847:
---

rajinisivaram commented on pull request #7966: KAFKA-8847; Deprecate and remove 
usage of supporting classes in kafka.security.auth
URL: https://github.com/apache/kafka/pull/7966
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deprecate and remove usage of supporting classes in kafka.security.auth
> ---
>
> Key: KAFKA-8847
> URL: https://issues.apache.org/jira/browse/KAFKA-8847
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.5.0
>
>
> Deprecate Acl, Resource etc. from `kafka.security.auth` and replace 
> references to these with the equivalent Java classes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config

2020-01-17 Thread Stanislav Savulchik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017965#comment-17017965
 ] 

Stanislav Savulchik commented on KAFKA-8406:


[~enether] seems like the issue is fixed in 2.4.0
{noformat}
➜  kafka_2.12-2.4.0 bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter 
--topic topic --config segment.bytes=104857600
Option combination "[bootstrap-server],[config]" can't be used with option 
"[alter]"{noformat}
 

> kafka-topics throws wrong error on invalid configuration with 
> bootstrap-server and alter config
> ---
>
> Key: KAFKA-8406
> URL: https://issues.apache.org/jira/browse/KAFKA-8406
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic{code}
> Results in
> {code:java}
> Missing required argument "[partitions]"{code}
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic --partitions 25{code}
> Results in
> {code:java}
> Option combination "[bootstrap-server],[config]" can't be used with option 
> "[alter]"{code}
> For better clarity, we should just throw the last error outright.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7787) Add error specifications to KAFKA-7609

2020-01-17 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017873#comment-17017873
 ] 

Tom Bentley commented on KAFKA-7787:


[~cmccabe] I was thinking about this and have a partially working 
implementation, but wanted your thoughts before I spend more time on it.

cc: [~hachikuji] who raised the original comment on the PR for the message 
generator.

The following approach works for all enum-like codes in the protocol, not just 
error codes.

h2. Declaring coded values

The generator will support a new kind of JSON object input {{"type": "codes"}}, 
which describes a set of distinct named integer (byte, short, etc) values. For 
example:
{code:language=js}
{
  "type": "codes",
  "name": "ErrorCodes",
  "valueType": "int16"
  "codes": [
{ "name": "UNKNOWN_SERVER_ERROR", "value": -1,
  "about": "The server experienced an unexpected error when processing the 
request." },
{ "name": "NONE", "value": 0,
  "about": "No error." },
{ "name": "OFFSET_OUT_OF_RANGE", "value": 1,
  "about": "The requested offset is not within the range of offsets 
maintained by the server." },
  ...
  ]
}
{code}

 * The `valueType` is the type of the integer values.
 * The `codes` lists each of the allowed values. The `about` is optional.

This would generate a class of constants:

{code:language=java}
class ErrorCodes {
public final static short UNKNOWN_SERVER_ERROR = -1;
public final static short NONE = 0;
...

public static boolean isValid(short v) {
return NONE <= v && v <= MAX; 
}
}
{code}

* The {{isValid()}} method validates that its parameter is one of the allowed 
values.
* It's an error for two constants to have the same value.
* There need be no requirement for the values to be contiguous.


Continuing the example this allows the existing `Errors` enum to be written as:

{code:language=java}
enum Errors {
NONE(ErrorCodes.NONE, ...);
...
}
{code}

h2. Using codes in field specs

The field spec will support a {{domain}} property which names the set of codes 
that values of the field may take. For example an {{ErrorCode}} field:

{code:language=js}
 {
  "name": "ErrorCode",
  "type": "int16",
  "domain": {
"name": "ErrorCodes",
"values": [
  { "name": "NONE", "validVersions": "0+" },
  { "name": "FOO", "validVersions": "0+" },
  { "name": "BAR", "validVersions": "3+" },
  ...
]
   }
 }
{code}

* The {{name}} is the name of a corresponding codes declaration.
* The {{values}} is optional. When it's missing then any of the values in the 
codes declaration are permitted. When it's present, then only the given values 
are allowed. Values are given as an object with a `name` that identifies a 
value from the codes declaration and optionally, a {validVersions} which allows 
a given code to only be allowed in the given versions of the message.

The owning {Data} class (or inner classes of the {Data} class) will gain a 
method for validating the error codes. The implementation would depend on 
whether {values} and/or {validVersions} were given, but might look like this:

{code:language=java}
public static boolean isValidErrorCode(short v, short version) {
switch (version) {
case 0:
case 1:
case 2:
return v == ErrorCodes.NONE || v == ErrorCodes.FOO;
case 3:
return v == ErrorCodes.NONE || v == ErrorCodes.FOO || v == 
ErrorCodes.BAR;
 }
}
{code}

h2. Validation

We can call the validation methods and throw:

 * When serializing requests
 * When deserializing requests
 * When serializing responses, except for error code fields.

The reason for distinguishing error code fields arises from the difficultly of 
knowing for certain which exception types can be thrown in the code called from 
the handler in the broker. We don't want a mistake the allowed error codes to 
result in a needless exception in the broker. So for these instead of throwing 
we could log the unexpected value.

We could use properties of the field spec to configure what code was generated 
for serialization and deserialization on a per-message basis.

Thoughts?



> Add error specifications to KAFKA-7609
> --
>
> Key: KAFKA-7787
> URL: https://issues.apache.org/jira/browse/KAFKA-7787
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin McCabe
>Assignee: Tom Bentley
>Priority: Minor
>
> In our RPC JSON, it would be nice if we could specify what versions of a 
> response could contain what errors.  See the discussion here: 
> https://github.com/apache/kafka/pull/5893#discussion_r244841051



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7787) Add error specifications to KAFKA-7609

2020-01-17 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-7787:
--

Assignee: Tom Bentley

> Add error specifications to KAFKA-7609
> --
>
> Key: KAFKA-7787
> URL: https://issues.apache.org/jira/browse/KAFKA-7787
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin McCabe
>Assignee: Tom Bentley
>Priority: Minor
>
> In our RPC JSON, it would be nice if we could specify what versions of a 
> response could contain what errors.  See the discussion here: 
> https://github.com/apache/kafka/pull/5893#discussion_r244841051



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6879) Controller deadlock following session expiration

2020-01-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017827#comment-17017827
 ] 

leibo commented on KAFKA-6879:
--

Hello [~hachikuji],  I have met this issue on kafka 2.1.1 many times , and the 
description is here https://issues.apache.org/jira/browse/KAFKA-8532

 

So I think the controller deadlock problem is not solved completely.

> Controller deadlock following session expiration
> 
>
> Key: KAFKA-6879
> URL: https://issues.apache.org/jira/browse/KAFKA-6879
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.1.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 1.1.1, 2.0.0
>
>
> We have observed an apparent deadlock situation which occurs following a 
> session expiration. The suspected deadlock occurs between the zookeeper 
> "initializationLock" and the latch inside the Expire event which we use to 
> ensure all events have been handled.
> In the logs, we see the "Session expired" message following acquisition of 
> the initialization lock: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala#L358
> But we never see any logs indicating that the new session is being 
> initialized. In fact, the controller logs are basically empty from that point 
> on. The problem we suspect is that completion of the 
> {{beforeInitializingSession}} callback requires that all events have finished 
> processing in order to count down the latch: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1525.
> But an event which was dequeued just prior to the acquisition of the write 
> lock may be unable to complete because it is awaiting acquisition of the 
> initialization lock: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala#L137.
> The impact is that the broker continues in a zombie state. It continues 
> fetching and is periodically added to ISRs, but it never receives any further 
> requests from the controller since it is not registered.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)