[jira] [Commented] (KAFKA-8914) Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error
[ https://issues.apache.org/jira/browse/KAFKA-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932056#comment-16932056 ] Igamr Palsenberg commented on KAFKA-8914: - The broker is 2.3.0 according to brew. I didn't do any configuration in broker side, except from explicitly setting the bind adress. > Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error > -- > > Key: KAFKA-8914 > URL: https://issues.apache.org/jira/browse/KAFKA-8914 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0 >Reporter: Igamr Palsenberg >Priority: Major > > > Setting ConsumerConfig.GROUP_INSTANCE_ID_CONFIG (group.instance.id) to any > value (I tried valid numbers and just some string values) will result in : > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'version': > java.nio.BufferUnderflowExceptiong.apache.kafka.common.protocol.types.SchemaException: > Error reading field 'version': java.nio.BufferUnderflowException at > org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) at > org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > > Kafka broker : > Kafka 2.3.0, installed using Brew. Default config, except the bind IP, that > is set explicitly to localhost. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leibo updated KAFKA-8532: - Affects Version/s: 2.3.0 > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1, 2.3.0 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:27 AM: --- [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle by ControllerEventThread. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:26 AM: --- [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:26 AM: --- [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:24 AM: --- [~junrao] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo commented on KAFKA-8532: -- My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:25 AM: --- [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): [~junrao] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Updated] (KAFKA-8915) Unable to modify partition
[ https://issues.apache.org/jira/browse/KAFKA-8915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-8915: --- Summary: Unable to modify partition (was: 无法修改partition) > Unable to modify partition > -- > > Key: KAFKA-8915 > URL: https://issues.apache.org/jira/browse/KAFKA-8915 > Project: Kafka > Issue Type: Bug >Reporter: lingyi.zhong >Priority: Major > > [root@work1 kafka]# bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 > --replication-factor 1 --partitions 1 --topic test_topic3[root@work1 kafka]# > bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 > --replication-factor 1 --partitions 1 --topic test_topic3 > WARNING: Due to limitations in metric names, topics with a period ('.') or > underscore ('_') could collide. To avoid issues it is best to use either, but > not both.Created topic "test_topic3".[root@work1 kafka]# bin/kafka-topics.sh > --alter --zookeeper 10.20.30.78:2181/chroot --partition 2 --topic test_topic3 > Exception in thread "main" joptsimple.UnrecognizedOptionException: partition > is not a recognized option at > joptsimple.OptionException.unrecognizedOption(OptionException.java:108) at > joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510) at > joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56) at > joptsimple.OptionParser.parse(OptionParser.java:396) at > kafka.admin.TopicCommand$TopicCommandOptions.(TopicCommand.scala:358) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:44) at > kafka.admin.TopicCommand.main(TopicCommand.scala) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932006#comment-16932006 ] leibo commented on KAFKA-8532: -- Hi, [~junrao] , the problem is happened again, the cluster jstack dump log were uploaded . please help to analysis this problem, thx. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at
[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leibo updated KAFKA-8532: - Attachment: js2.log > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596) > at >
[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leibo updated KAFKA-8532: - Attachment: js1.log > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596) > at >
[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leibo updated KAFKA-8532: - Attachment: js0.log > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596) > at >
[jira] [Commented] (KAFKA-8104) Consumer cannot rejoin to the group after rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931990#comment-16931990 ] Charles Qian commented on KAFKA-8104: - same issue. kafka-client 2.2.0 it happens in large topology. like each topic has > 16 partitions. so that for each topic, client needs > 16 consumer threads. > Consumer cannot rejoin to the group after rebalancing > - > > Key: KAFKA-8104 > URL: https://issues.apache.org/jira/browse/KAFKA-8104 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 >Reporter: Gregory Koshelev >Priority: Critical > Attachments: consumer-rejoin-fail.log > > > TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent > {{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and > {{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). > See explanation below. > There are 16 consumers in single process (threads from pool-4-thread-1 to > pool-4-thread-16). All of them belong to single consumer group > {{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired > and consumers have got {{CommitFailedException}} as expected: > {noformat} > 2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN > r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured max.poll.interval.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298) > at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156) > at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > After that, most of them successfully rejoined to the group with generation > 10699: > {noformat} > 2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation 10699 > 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-18] > ... > 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation 10699 > 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11] > ... > 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-24] > 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer
[jira] [Commented] (KAFKA-8086) Flaky Test GroupAuthorizerIntegrationTest#testPatternSubscriptionWithTopicAndGroupRead
[ https://issues.apache.org/jira/browse/KAFKA-8086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931950#comment-16931950 ] ASF GitHub Bot commented on KAFKA-8086: --- guozhangwang commented on pull request #7356: KAFKA-8086: Use 1 partition for offset topic when possible URL: https://github.com/apache/kafka/pull/7356 I realized some flaky tests failed at `setup` or calls that tries to create offset topics, and I think using one partition and one replica would be sufficient in these cases. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky Test > GroupAuthorizerIntegrationTest#testPatternSubscriptionWithTopicAndGroupRead > -- > > Key: KAFKA-8086 > URL: https://issues.apache.org/jira/browse/KAFKA-8086 > Project: Kafka > Issue Type: Bug > Components: admin, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > Fix For: 2.4.0 > > > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/62/testReport/junit/kafka.api/GroupAuthorizerIntegrationTest/testPatternSubscriptionWithTopicAndGroupRead/] > {quote}java.lang.AssertionError: Partition [__consumer_offsets,0] metadata > not propagated after 15000 ms at > kafka.utils.TestUtils$.fail(TestUtils.scala:381) at > kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791) at > kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:880) at > kafka.utils.TestUtils$.$anonfun$createTopic$3(TestUtils.scala:318) at > kafka.utils.TestUtils$.$anonfun$createTopic$3$adapted(TestUtils.scala:317) at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at > scala.collection.immutable.Range.foreach(Range.scala:158) at > scala.collection.TraversableLike.map(TraversableLike.scala:237) at > scala.collection.TraversableLike.map$(TraversableLike.scala:230) at > scala.collection.AbstractTraversable.map(Traversable.scala:108) at > kafka.utils.TestUtils$.createTopic(TestUtils.scala:317) at > kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:375) at > kafka.api.AuthorizerIntegrationTest.setUp(AuthorizerIntegrationTest.scala:242){quote} > STDOUT > {quote}[2019-03-09 08:40:34,220] ERROR [KafkaApi-0] Error when handling > request: clientId=0, correlationId=0, api=UPDATE_METADATA, > body=\{controller_id=0,controller_epoch=1,broker_epoch=25,topic_states=[],live_brokers=[{id=0,end_points=[{port=41020,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]} > (kafka.server.KafkaApis:76) > org.apache.kafka.common.errors.ClusterAuthorizationException: Request > Request(processor=0, connectionId=127.0.0.1:41020-127.0.0.1:52304-0, > session=Session(Group:testGroup,/127.0.0.1), > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > buffer=null) is not authorized. [2019-03-09 08:40:35,336] ERROR [Consumer > clientId=consumer-98, groupId=my-group] Offset commit failed on partition > topic-0 at offset 5: Not authorized to access topics: [Topic authorization > failed.] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:812) > [2019-03-09 08:40:35,336] ERROR [Consumer clientId=consumer-98, > groupId=my-group] Not authorized to commit to topics [topic] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:850) > [2019-03-09 08:40:41,649] ERROR [KafkaApi-0] Error when handling request: > clientId=0, correlationId=0, api=UPDATE_METADATA, > body=\{controller_id=0,controller_epoch=1,broker_epoch=25,topic_states=[],live_brokers=[{id=0,end_points=[{port=36903,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]} > (kafka.server.KafkaApis:76) > org.apache.kafka.common.errors.ClusterAuthorizationException: Request > Request(processor=0, connectionId=127.0.0.1:36903-127.0.0.1:44978-0, > session=Session(Group:testGroup,/127.0.0.1), > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > buffer=null) is not authorized. [2019-03-09 08:40:53,898] ERROR [KafkaApi-0] > Error when handling request: clientId=0, correlationId=0, > api=UPDATE_METADATA, >
[jira] [Resolved] (KAFKA-8919) Flaky Test kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess
[ https://issues.apache.org/jira/browse/KAFKA-8919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-8919. -- Resolution: Fixed Realize it is fixed as part of polling (1L) in https://github.com/apache/kafka/pull/7312 > Flaky Test > kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess > > > Key: KAFKA-8919 > URL: https://issues.apache.org/jira/browse/KAFKA-8919 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test > > {code} > Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 1 records > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) > at org.scalatest.Assertions.fail$(Assertions.scala:1087) > at org.scalatest.Assertions$.fail(Assertions.scala:1389) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:830) > at kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:795) > at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1325) > at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1333) > at > kafka.api.AuthorizerIntegrationTest.consumeRecords(AuthorizerIntegrationTest.scala:1772) > at > kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess(AuthorizerIntegrationTest.scala:813) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor110.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at >
[jira] [Commented] (KAFKA-8913) Document topic based configs & ISR settings for Streams apps
[ https://issues.apache.org/jira/browse/KAFKA-8913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931936#comment-16931936 ] ASF GitHub Bot commented on KAFKA-8913: --- mjsax commented on pull request #7346: KAFKA-8913: Document topic based configs & ISR settings for Streams apps URL: https://github.com/apache/kafka/pull/7346 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document topic based configs & ISR settings for Streams apps > > > Key: KAFKA-8913 > URL: https://issues.apache.org/jira/browse/KAFKA-8913 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Vinoth Chandar >Assignee: Vinoth Chandar >Priority: Major > > Noticed that it was not clear how to configure the internal topics . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931931#comment-16931931 ] Matthias J. Sax commented on KAFKA-7499: Alaa, thanks for looking into this. I'll reply on the mailing list – the discussion is supposed to happen there, and not on the ticket. > Extend ProductionExceptionHandler to cover serialization exceptions > --- > > Key: KAFKA-7499 > URL: https://issues.apache.org/jira/browse/KAFKA-7499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Alaa Zbair >Priority: Major > Labels: beginner, kip, newbie > > In > [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce], > an exception handler for the write path was introduced. This exception > handler covers exception that are raised in the producer callback. > However, serialization happens before the data is handed to the producer with > Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair > types. > Thus, we might want to extend the ProductionExceptionHandler to cover > serialization exception, too, to skip over corrupted output messages. An > example could be a "String" message that contains invalid JSON and should be > serialized as JSON. > KIP-399 (not voted yet; feel free to pick it up): > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8595) Support SerDe of Decimals in JSON that are not HEX encoded
[ https://issues.apache.org/jira/browse/KAFKA-8595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931926#comment-16931926 ] ASF GitHub Bot commented on KAFKA-8595: --- agavra commented on pull request #7354: KAFKA-8595: Support deserialization of decimals encoded in NUMERIC fo… URL: https://github.com/apache/kafka/pull/7354 see [KIP-481](https://cwiki.apache.org/confluence/display/KAFKA/KIP-481%3A+SerDe+Improvements+for+Connect+Decimal+type+in+JSON) for more details Review Guide: - Added `DecimalFormat` enum to represent different JSON decimal node types and corresponding config in `JsonConverterConfig` - Split `LogicalTypeConverter` to have two strongly typed methods (`toJson` and `toConnect`) so that we could group SerDe methods together (this caused some code to move around because I could combine `TO_JSON_LOGICAL_CONVERTERS` and `TO_CONNECT_LOGICAL_CONVERTERS`) - Implemented the SerDe in the line containing `LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter()` - Added `CaseInsensitiveValidString` to the validators Unit tests covering all of the formats are added. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SerDe of Decimals in JSON that are not HEX encoded > -- > > Key: KAFKA-8595 > URL: https://issues.apache.org/jira/browse/KAFKA-8595 > Project: Kafka > Issue Type: Improvement >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Major > > Most JSON data that utilizes precise decimal data represents it as a decimal > string. Kafka Connect, on the other hand, only supports a binary HEX string > encoding (see example below). We should support deserialization and > serialization for any of the following types: > {code:java} > { > "asHex": "D3J5", > "asString": "10.12345" > "asNumber": 10.2345 > }{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8919) Flaky Test kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess
Guozhang Wang created KAFKA-8919: Summary: Flaky Test kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess Key: KAFKA-8919 URL: https://issues.apache.org/jira/browse/KAFKA-8919 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Guozhang Wang {code} Stacktrace org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout instead of the expected 1 records at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:830) at kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:795) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1325) at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1333) at kafka.api.AuthorizerIntegrationTest.consumeRecords(AuthorizerIntegrationTest.scala:1772) at kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess(AuthorizerIntegrationTest.scala:813) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.ParentRunner.run(ParentRunner.java:412) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at jdk.internal.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at
[jira] [Updated] (KAFKA-8918) Flaky Test org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation
[ https://issues.apache.org/jira/browse/KAFKA-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-8918: - Labels: flaky-test (was: ) > Flaky Test > org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation > > > Key: KAFKA-8918 > URL: https://issues.apache.org/jira/browse/KAFKA-8918 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 15000. Timed out > waiting for expected tasks > {"foo":{"id":"foo","taskState":{"state":"DONE","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"cancelled":true,"status":{"node01":"done","node02":"done"}},"workerState":{"state":"DONE","taskId":"foo","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"status":"done"}}} > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) > at > org.apache.kafka.trogdor.common.ExpectedTasks.waitFor(ExpectedTasks.java:144) > at > org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation(CoordinatorTest.java:264) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:834) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8575) Investigate cleaning up task suspension (part 8)
[ https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931832#comment-16931832 ] Sophie Blee-Goldman commented on KAFKA-8575: Partly tackled by the first PR for KAFKA-8510 – removes suspension of standby tasks (for all protocols). Active tasks that are revoked will first go through the process of being suspended and then closed if not reassigned, if we decide to remove eager rebalancing we can further clean that up to just close them and get rid of all suspension > Investigate cleaning up task suspension (part 8) > > > Key: KAFKA-8575 > URL: https://issues.apache.org/jira/browse/KAFKA-8575 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > With KIP-429 the suspend/resume of tasks may have minimal gains while adding > a lot of complexity and potential bugs. We should consider removing/cleaning > it up. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Closed] (KAFKA-5635) KIP-181 Kafka-Connect integrate with kafka ReST Proxy
[ https://issues.apache.org/jira/browse/KAFKA-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava closed KAFKA-5635. > KIP-181 Kafka-Connect integrate with kafka ReST Proxy > - > > Key: KAFKA-5635 > URL: https://issues.apache.org/jira/browse/KAFKA-5635 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Dhananjay Patkar >Priority: Major > Labels: features, newbie > > Kafka connect currently uses kafka clients which directly connect to kafka > brokers. > In a use case, wherein I have many kafka connect [producers] running remotely > its a challenge to configure broker information on every connect agent. > Also, in case of IP change [upgrade or cluster re-creation], we need to > update every remote connect configuration. > If kafka connect source connectors talk to ReST endpoint then client is > unaware of broker details. This way we can transparently upgrade / re-create > kafka cluster as long as ReST endpoint remains same. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-8603) Document upgrade path
[ https://issues.apache.org/jira/browse/KAFKA-8603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-8603: -- Assignee: Sophie Blee-Goldman > Document upgrade path > - > > Key: KAFKA-8603 > URL: https://issues.apache.org/jira/browse/KAFKA-8603 > Project: Kafka > Issue Type: Sub-task > Components: consumer, streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > Users need to follow a specific upgrade path in order to smoothly and safely > perform live upgrade. We should very clearly document the steps needed to > upgrade a Consumer and a Streams app (note they will be different) -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8918) Flaky Test org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation
[ https://issues.apache.org/jira/browse/KAFKA-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-8918: - Component/s: core > Flaky Test > org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation > > > Key: KAFKA-8918 > URL: https://issues.apache.org/jira/browse/KAFKA-8918 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 15000. Timed out > waiting for expected tasks > {"foo":{"id":"foo","taskState":{"state":"DONE","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"cancelled":true,"status":{"node01":"done","node02":"done"}},"workerState":{"state":"DONE","taskId":"foo","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"status":"done"}}} > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) > at > org.apache.kafka.trogdor.common.ExpectedTasks.waitFor(ExpectedTasks.java:144) > at > org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation(CoordinatorTest.java:264) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:834) > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-8902) Benchmark cooperative vs eager rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-8902: --- Assignee: John Roesler > Benchmark cooperative vs eager rebalancing > -- > > Key: KAFKA-8902 > URL: https://issues.apache.org/jira/browse/KAFKA-8902 > Project: Kafka > Issue Type: Sub-task >Reporter: Sophie Blee-Goldman >Assignee: John Roesler >Priority: Major > > Cause rebalance and measure: > * overall throughput > * paused time > * (also look at the metrics from > (https://issues.apache.org/jira/browse/KAFKA-8609)): > ** accumulated rebalance time > Cluster/topic sizing: > ** 10 instances > ** 100 tasks (each instance gets 10 tasks) > ** 1000 stores (each task gets 10 stores) > * standbys = [0 and 1] > Rolling bounce: > * with and without state loss > * shorter and faster than session timeout (shorter in particular should be > interesting) > Expand (from 9 to 10) > Contract (from 10 to 9) > With and without saturation: > EOS: > * with and without > Topology: > * stateful > * windowed agg > Key Parameterizations: > 1. control: no rebalances > 2. rolling without state loss faster than session timeout > 3. expand 9 to 10 > 4. contract 10 to 9 -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8902) Benchmark cooperative vs eager rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-8902: Description: Cause rebalance and measure: * overall throughput * paused time * (also look at the metrics from (https://issues.apache.org/jira/browse/KAFKA-8609)): ** accumulated rebalance time Cluster/topic sizing: ** 10 instances ** 100 tasks (each instance gets 10 tasks) ** 1000 stores (each task gets 10 stores) * standbys = [0 and 1] Rolling bounce: * with and without state loss * shorter and faster than session timeout (shorter in particular should be interesting) Expand (from 9 to 10) Contract (from 10 to 9) With and without saturation: EOS: * with and without Topology: * stateful * windowed agg Key Parameterizations: 1. control: no rebalances 2. rolling without state loss faster than session timeout 3. expand 9 to 10 4. contract 10 to 9 > Benchmark cooperative vs eager rebalancing > -- > > Key: KAFKA-8902 > URL: https://issues.apache.org/jira/browse/KAFKA-8902 > Project: Kafka > Issue Type: Sub-task >Reporter: Sophie Blee-Goldman >Priority: Major > > Cause rebalance and measure: > * overall throughput > * paused time > * (also look at the metrics from > (https://issues.apache.org/jira/browse/KAFKA-8609)): > ** accumulated rebalance time > Cluster/topic sizing: > ** 10 instances > ** 100 tasks (each instance gets 10 tasks) > ** 1000 stores (each task gets 10 stores) > * standbys = [0 and 1] > Rolling bounce: > * with and without state loss > * shorter and faster than session timeout (shorter in particular should be > interesting) > Expand (from 9 to 10) > Contract (from 10 to 9) > With and without saturation: > EOS: > * with and without > Topology: > * stateful > * windowed agg > Key Parameterizations: > 1. control: no rebalances > 2. rolling without state loss faster than session timeout > 3. expand 9 to 10 > 4. contract 10 to 9 -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-8496) Add system test for compatibility and upgrade path (part 6)
[ https://issues.apache.org/jira/browse/KAFKA-8496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-8496: --- Assignee: Bill Bejeck > Add system test for compatibility and upgrade path (part 6) > --- > > Key: KAFKA-8496 > URL: https://issues.apache.org/jira/browse/KAFKA-8496 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Bill Bejeck >Priority: Major > > tests: > * compatibility > * upgrade (follow the existing upgrade pattern) > * make sure we don't get stuck in rebalance loops (eventually get to some > agreed generation, verify we don't trigger more rebalances after agreed > generation) > acceptance: > * verify the partition assignment is correct > * verify that Streams returns to processing -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8496) Add system test for compatibility and upgrade path (part 6)
[ https://issues.apache.org/jira/browse/KAFKA-8496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-8496: Description: tests: * compatibility * upgrade (follow the existing upgrade pattern) * make sure we don't get stuck in rebalance loops (eventually get to some agreed generation, verify we don't trigger more rebalances after agreed generation) acceptance: * verify the partition assignment is correct * verify that Streams returns to processing > Add system test for compatibility and upgrade path (part 6) > --- > > Key: KAFKA-8496 > URL: https://issues.apache.org/jira/browse/KAFKA-8496 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Priority: Major > > tests: > * compatibility > * upgrade (follow the existing upgrade pattern) > * make sure we don't get stuck in rebalance loops (eventually get to some > agreed generation, verify we don't trigger more rebalances after agreed > generation) > acceptance: > * verify the partition assignment is correct > * verify that Streams returns to processing -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-8609) Add consumer metrics for rebalances (part 9)
[ https://issues.apache.org/jira/browse/KAFKA-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-8609: --- Assignee: Guozhang Wang (was: Sophie Blee-Goldman) > Add consumer metrics for rebalances (part 9) > > > Key: KAFKA-8609 > URL: https://issues.apache.org/jira/browse/KAFKA-8609 > Project: Kafka > Issue Type: Sub-task >Reporter: Sophie Blee-Goldman >Assignee: Guozhang Wang >Priority: Major > > We would like to track some additional metrics on the consumer side related > to rebalancing as part of this KIP, including > # listener callback latency > ## partitions-revoked-time-avg > ## partitions-revoked-time-max > ## partitions-assigned-time-avg > ## partitions-assigned-time-max > ## partitions-lost-time-avg > ## partitions-lost-time-max > # rebalance rate (# rebalances per day) > ## rebalance-rate-per-day -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931777#comment-16931777 ] Frederic Tardif commented on KAFKA-8523: just to clarify my comment above. The `behavior.on.null.values` already exists (at least on the elasticsearch connector), so it is important to keep the null values when we aim to interpret it as `delete` on an elasticsearch index entry. Applying a transform that InsertFields on null would completely defies the purpose in this scenario. I strongly believe the transform should skip null values or this transform behaviour should at least be configurable. !image-2019-09-17-15-53-44-038.png! > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > Attachments: image-2019-09-17-15-53-44-038.png > > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931777#comment-16931777 ] Frederic Tardif edited comment on KAFKA-8523 at 9/17/19 7:58 PM: - just to clarify my comment above. The `behavior.on.null.values` already exists (at least on the elasticsearch connector), so it is important to keep the null values when we aim to interpret it as `delete` on an elasticsearch index entry. Applying a transform that InsertFields on null would completely defies the purpose in this scenario. I strongly believe the transform should skip null values or this transform behaviour should at least be configurable. !image-2019-09-17-15-53-44-038.png|width=693,height=130! was (Author: frederic.tardif): just to clarify my comment above. The `behavior.on.null.values` already exists (at least on the elasticsearch connector), so it is important to keep the null values when we aim to interpret it as `delete` on an elasticsearch index entry. Applying a transform that InsertFields on null would completely defies the purpose in this scenario. I strongly believe the transform should skip null values or this transform behaviour should at least be configurable. !image-2019-09-17-15-53-44-038.png! > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > Attachments: image-2019-09-17-15-53-44-038.png > > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931776#comment-16931776 ] Ryanne Dolan commented on KAFKA-7500: - [~qihong] That's all correct :) In fact, KIP-382 mentions a "primary" cluster (in your case "C") and a single Connect cluster being used to replicate data between any number of other clusters. One way to do this is with a SinkConnector (part of the KIP and coming later) s.t. records travel _through_ the primary cluster to other clusters. I believe you could also override the Connect Worker's internal Producer config to write directly to another cluster, s.t., as you say, state is stored in one cluster but records go to another. I've never tried that, ymmv, but I suspect it'd work as you've described. Notice that, generally, you will end up with multiple Connect clusters anyway -- one in each DC -- for performance and HA reasons. At that point you are back to managing multiple Connectors on multiple Connect clusters. MM2's top-level driver manages that complexity for you by automatically spinning up and configuring all the required Workers. re a REST API, the MM2 driver essentially turns off the REST service inside each Herder. This is because the current Connect REST API doesn't support having multiple Herders or Worker configs, so we'd need to sorta abuse the Connect REST API to get it to work. However, there was much discussion around KIP-382 re an MM2 REST API, and there are several good ideas floating around. These were ultimately deferred, but not ruled out. Also, fwiw, I have successfully run an MM2 cluster with the Connect REST API turned on, with each Herder's endpoints wrapped in a higher-level API. I have done this successfully with a reverse proxy as well as with a fork of Connect's RestServer. This enables you to start/stop/configure individual connectors within the MM2 cluster, if you so wish. And finally: MM2 is very pluggable. For example, you can drop in a TopicFilter that grabs dynamic whitelists from somewhere. I happen to know this works very well :) > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederic Tardif updated KAFKA-8523: --- Attachment: image-2019-09-17-15-53-44-038.png > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > Attachments: image-2019-09-17-15-53-44-038.png > > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8912) TimeoutException cause is insufficiently documented for the KafkaProducer
[ https://issues.apache.org/jira/browse/KAFKA-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ramkishan updated KAFKA-8912: - Affects Version/s: 2.3.0 > TimeoutException cause is insufficiently documented for the KafkaProducer > - > > Key: KAFKA-8912 > URL: https://issues.apache.org/jira/browse/KAFKA-8912 > Project: Kafka > Issue Type: Bug > Components: documentation, producer >Affects Versions: 1.0.1, 2.3.0 >Reporter: Ramkishan >Priority: Major > > The javadocs of the org.apache.kafka.clients.producer.KafkaProducer class has > partially articulated description for the TimeoutException, wherever > applicable. > The document reads - > "{{[TimeoutException|https://kafka.apache.org/23/javadoc/org/apache/kafka/common/errors/TimeoutException.html]}} > - If the time taken for fetching metadata or allocating memory for the > record has surpassed {{max.block.ms}}." > While we are aware that this exception can also be thrown if the RTM occurs > when the message is on the accumulator and the batch expires. The current > description is misleading the developers who try to optimize the producer > config and if they face this issue > [https://kafka.apache.org/23/javadoc/org/apache/kafka/common/errors/TimeoutException.html] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8912) TimeoutException cause is insufficiently documented for the KafkaProducer
[ https://issues.apache.org/jira/browse/KAFKA-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ramkishan updated KAFKA-8912: - Affects Version/s: (was: 2.2.1) (was: 0.10.0.1) 1.0.1 > TimeoutException cause is insufficiently documented for the KafkaProducer > - > > Key: KAFKA-8912 > URL: https://issues.apache.org/jira/browse/KAFKA-8912 > Project: Kafka > Issue Type: Bug > Components: documentation, producer >Affects Versions: 1.0.1 >Reporter: Ramkishan >Priority: Major > > The javadocs of the org.apache.kafka.clients.producer.KafkaProducer class has > partially articulated description for the TimeoutException, wherever > applicable. > The document reads - > "{{[TimeoutException|https://kafka.apache.org/23/javadoc/org/apache/kafka/common/errors/TimeoutException.html]}} > - If the time taken for fetching metadata or allocating memory for the > record has surpassed {{max.block.ms}}." > While we are aware that this exception can also be thrown if the RTM occurs > when the message is on the accumulator and the batch expires. The current > description is misleading the developers who try to optimize the producer > config and if they face this issue > [https://kafka.apache.org/23/javadoc/org/apache/kafka/common/errors/TimeoutException.html] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931761#comment-16931761 ] Randall Hauch commented on KAFKA-8523: -- [~gunnar.morling], good point. I think a tombstone should be left unmodified by this SMT, which means no need for a KIP or a config change. I think converting or dropping a tombstone are separate SMTs. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931761#comment-16931761 ] Randall Hauch edited comment on KAFKA-8523 at 9/17/19 7:32 PM: --- [~gunnar.morling], good point. I think a tombstone should be left unmodified by this SMT, which means no need for a KIP or a config change. I think dropping a tombstone or replacing a tombstone with a non-null value are separate SMTs. was (Author: rhauch): [~gunnar.morling], good point. I think a tombstone should be left unmodified by this SMT, which means no need for a KIP or a config change. I think converting or dropping a tombstone are separate SMTs. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931743#comment-16931743 ] Qihong Chen edited comment on KAFKA-7500 at 9/17/19 7:22 PM: - Hi [~ryannedolan], just listened your Kafka Power Chat talk, Thanks! Have a follow up question about the last question (from me) you answered in the talk. You said you prefer dedicated MM2 cluster over running MM2 in connect cluster since you can use less number of clusters to do replications among multiple Kafka clusters. But there's no REST Api for a dedicated MM2 cluster that can provide the status of the replication streams, nor updating the replication configuration. Any changes to the configuration meaning update the config files and restart all MM2 instances, is that right? Or I missed it that dedicated MM2 cluster does provide REST API for admin and monitoring, if so, where is it? If my understanding is correct, we can archive the same thing with MM2 in connect cluster. Assume there are 3 Kafka clusters: A, B, and C. Set up a connect cluster against C (meaning all topics for connectors' data and states go to cluster C), then set up MM2 connectors to replicate data and metadata A -> B and B -> A. If this is correct, we can use the Kafka cluster C plus the connect cluster that running against Kafka cluster C to replicate data among more Kafka clusters, like A, B, and D, and even more. Of course, this needs more complicated configuration, which requires deeper understanding how the MM2 connectors work. In this scenario, the connect cluster provides REST API to admin and monitoring all the connectors. This will be useful for people can't use Stream Replication Manager from Cloudera or Kafka replicator from Confluent for some reason. Is this right? was (Author: qihong): Hi [~ryannedolan], just listened your Kafka Power Chat talk, Thanks! Have a follow up question about the last question (from me) you answered in the talk. You said you prefer dedicated MM2 cluster over running MM2 in connect cluster since you can use less number of clusters to do replications among multiple Kafka clusters. But there's no REST Api for a dedicated MM2 cluster that can provide the status of the replication streams, nor updating the replication configuration. Any changes to the configuration meaning update the config files and restart all MM2 instances, is that right? Or I missed it that dedicated MM2 cluster does provide REST API for admin and monitoring, if so, where is it? If my understanding is correct, we can archive the same thing with MM2 in connect cluster. Assume there are 3 Kafka clusters: A, B, and C. Set up a connect cluster against C (meaning all topics for connectors' data and states go to cluster C), then set up MM2 connectors to replicate data and metadata A -> B and B -> A. If this is correct, we can use the Kafka cluster C plus the connect cluster that running against Kafka cluster C to replicate data among more Kafka clusters, like A, B, and D, and even more. Of course, this needs more complicated configuration, which requires deeper understanding how the MM2 connectors work. In this scenario, the connect cluster provides REST API to admin and monitoring all the connectors. This will be useful for people can't use Stream Replication Manager from Cloudera or Kafka replicator from Confluent for some reason. Is this right? > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931754#comment-16931754 ] Sophie Blee-Goldman commented on KAFKA-8897: I suppose we could do a warning in #setCompactionOptionsFIFO no matter what ttl was set to, in case they did for some reason call setTtl(0). Not sure how much we care about supporting no-op methods though > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931743#comment-16931743 ] Qihong Chen commented on KAFKA-7500: Hi [~ryannedolan], just listened your Kafka Power Chat talk, Thanks! Have a follow up question about the last question (from me) you answered in the talk. You said you prefer dedicated MM2 cluster over running MM2 in connect cluster since you can use less number of clusters to do replications among multiple Kafka clusters. But there's no REST Api for a dedicated MM2 cluster that can provide the status of the replication streams, nor updating the replication configuration. Any changes to the configuration meaning update the config files and restart all MM2 instances, is that right? Or I missed it that dedicated MM2 cluster does provide REST API for admin and monitoring, if so, where is it? If my understanding is correct, we can archive the same thing with MM2 in connect cluster. Assume there are 3 Kafka clusters: A, B, and C. Set up a connect cluster against C (meaning all topics for connectors' data and states go to cluster C), then set up MM2 connectors to replicate data and metadata A -> B and B -> A. If this is correct, we can use the Kafka cluster C plus the connect cluster that running against Kafka cluster C to replicate data among more Kafka clusters, like A, B, and D, and even more. Of course, this needs more complicated configuration, which requires deeper understanding how the MM2 connectors work. In this scenario, the connect cluster provides REST API to admin and monitoring all the connectors. This will be useful for people can't use Stream Replication Manager from Cloudera or Kafka replicator from Confluent for some reason. Is this right? > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931733#comment-16931733 ] Matthias J. Sax commented on KAFKA-8897: Might be good enough – not 100% safe as one _could_ call `setTtl(0)` though, too. > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931712#comment-16931712 ] Sophie Blee-Goldman commented on KAFKA-8897: We could intercept the CompactionOptionsFIFO object in RocksDBAdapterClassWithLongName#setCompactionOptionsFIFO and get its ttl, if it's anything other than 0 (default is disabled) we know they've called the removed method. > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins
[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931696#comment-16931696 ] Matthias J. Sax commented on KAFKA-8377: {quote}1. Does this mean that every stateful operation on a KTable must be materialized? {quote} No. We only need to materialize the result of a KTable operation, iff, the next operation is a stateful transformValues. {quote}2. If that's the case should the user be notified that they need to use a materialized store for such operations e.g. after the topology optimization we can suggest that a materialized store needs to be created. I'm not a 100% sure if we must force create a StateStore (since users may want to pass specific configurations to the statestore) {quote} The user does not need to create a store IMHO – we can just do this internally. However, to be able to materialize the result, we need to know the corresponding `Serdes` – not sure if we need to do anything special about it... But for some cases, if we force materialization we might not have the correct `Serdes` and thus a user would need to specify them upstream via `Materialized` to avoid runtime errors (if they don't do this and hit a runtime error, it might be hard for users to understand the problem...) {quote}3. Is it also possible that's users are materializing the state on REDIS or some other caching mechanism {quote} Theoretically yes. But we don't need to worry about this case. If a user plugs in a custom store, they would need to specify this upstream anyway forcing a materialization explicitly. > KTable#transformValue might lead to incorrect result in joins > - > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Aishwarya Pradeep Kumar >Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8914) Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error
[ https://issues.apache.org/jira/browse/KAFKA-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931677#comment-16931677 ] Matthias J. Sax commented on KAFKA-8914: Are your brokers on 2.3.0 version? Using static group membership does not work with older brokers. > Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error > -- > > Key: KAFKA-8914 > URL: https://issues.apache.org/jira/browse/KAFKA-8914 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0 >Reporter: Igamr Palsenberg >Priority: Major > > > Setting ConsumerConfig.GROUP_INSTANCE_ID_CONFIG (group.instance.id) to any > value (I tried valid numbers and just some string values) will result in : > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'version': > java.nio.BufferUnderflowExceptiong.apache.kafka.common.protocol.types.SchemaException: > Error reading field 'version': java.nio.BufferUnderflowException at > org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) at > org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > > Kafka broker : > Kafka 2.3.0, installed using Brew. Default config, except the bind IP, that > is set explicitly to localhost. > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931671#comment-16931671 ] Gunnar Morling commented on KAFKA-7052: --- I'm wondering: is it solely a question of having a more meaningful exception, or should rather be null returned in this case. Seems like one might want either, depending on the use case. > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8911) Implicit TimeWindowedSerde creates Serde with null inner serializer
[ https://issues.apache.org/jira/browse/KAFKA-8911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931670#comment-16931670 ] Matthias J. Sax commented on KAFKA-8911: Thanks for the PR [~atais] – you can re-trigger a build by adding a comment on the PR that contains the phrase "retest this please". I would hold off for now to rerun the tests until you got a review though – if you need to update the PR each `push` will trigger retesting anyway – Jenkins is often overloaded anyway and we try to save cycles if we can. Btw: please ask question directly on the PR instead of the Jira ticket – that's easier for us. Thanks a lot. > Implicit TimeWindowedSerde creates Serde with null inner serializer > --- > > Key: KAFKA-8911 > URL: https://issues.apache.org/jira/browse/KAFKA-8911 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Michał >Assignee: Michał >Priority: Major > > {{Serdes.scala}} contains an implicit def timeWindowedSerde as below: > > {code:java} > implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new > WindowedSerdes.TimeWindowedSerde[T]() > {code} > It creates a new {{TimeWindowedSerde}} without inner serializer, which is a > bug. Even in {{WindowedSerdes.java}} it says that empty constructor is for > reflection. > {code:java} > // Default constructor needed for reflection object creation > public TimeWindowedSerde() { > super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>()); > } > public TimeWindowedSerde(final Serde inner) { > super(new TimeWindowedSerializer<>(inner.serializer()), new > TimeWindowedDeserializer<>(inner.deserializer())); > } > {code} > All above failes for me when I try to implicitly access the right Serde: > {code:java} > private val twSerde = implicitly[TimeWindowedSerde[String]] > {code} > and I have to create the object properly on my own > {code} > private val twSerde = new > WindowedSerdes.TimeWindowedSerde[String](implicitly[Serde[String]]) > {code} > it could be fixed with a proper call in {{Serdes.scala}} > {code} > implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): > WindowedSerdes.TimeWindowedSerde[T] = > new WindowedSerdes.TimeWindowedSerde[T](tSerde) > {code} > But maybe also the scope of the default constructor for {{TimeWindowedSerde}} > should be changed? > BR, Michał -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931662#comment-16931662 ] Gunnar Morling edited comment on KAFKA-8523 at 9/17/19 5:08 PM: Yes, I was arriving at pretty much the same conclusion; that said: is an option even needed: I've come to think that a tombstone should remain just that, a tombstone, i.e. I don't see a good reason to inject any value into a tombstone really; after all, it has specific semantics -- enabling compaction -- which shouldn't be altered. So I would suggest I simply update my PR so that it passes on tombstones unmodified and that should be good enought. WDYT? Note, if we wanted to have an option, I think it would allow to pass on tombstones unmodified (as suggested above) OR to insert the new field, i.e. there wouldn't be an empty map really returned, but a map with a single entry for that new field (so what the current PR is doing). This could be done, but as argued it'd change the message's nature of being a tombstone, so it's probably not desirable? was (Author: gunnar.morling): Yes, I was arriving at pretty much the same conclusion; that said: is an option even needed: I've come to think that a tombstone should remain just that, a tombstone, i.e. I don't see a good reason to inject any value into a tombstone really; after all, it has specific semantics -- enabling compaction -- which shouldn't be altered. So I would suggest I simply update my PR so that it passes on tombstones unmodified and that should be good enought. WDYT? > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931665#comment-16931665 ] Matthias J. Sax commented on KAFKA-8897: {quote}Seems like the lack of deprecation if anything means we can only do it in a major release. But if we do want to do it in a minor release, I think we should at least just log a warning in 2.4 if that method is called and delay the upgrade until 2.5. {quote} I don't see how we can do this? While I agree with the incentive to not break compatibility, I don't see a way how *_we_* could deprecate anything? We don't control `CompactionOptionsFIFO` and if user code creates this class, how could we possible intercept the call to `setTtl()` and log a warning? We only control `Options` that we pass as method parameter. > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931662#comment-16931662 ] Gunnar Morling commented on KAFKA-8523: --- Yes, I was arriving at pretty much the same conclusion; that said: is an option even needed: I've come to think that a tombstone should remain just that, a tombstone, i.e. I don't see a good reason to inject any value into a tombstone really; after all, it has specific semantics -- enabling compaction -- which shouldn't be altered. So I would suggest I simply update my PR so that it passes on tombstones unmodified and that should be good enought. WDYT? > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8839) Improve logging in Kafka Streams around debugging task lifecycle
[ https://issues.apache.org/jira/browse/KAFKA-8839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931649#comment-16931649 ] ASF GitHub Bot commented on KAFKA-8839: --- guozhangwang commented on pull request #7258: KAFKA-8839 : Improve streams debug logging URL: https://github.com/apache/kafka/pull/7258 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve logging in Kafka Streams around debugging task lifecycle > - > > Key: KAFKA-8839 > URL: https://issues.apache.org/jira/browse/KAFKA-8839 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Vinoth Chandar >Assignee: Vinoth Chandar >Priority: Major > > As a follow up to KAFKA-8831, this Jira will track efforts around improving > logging/docs around > > * Being able to follow state of tasks from assignment to restoration > * Better detection of misconfigured state store dir > * Docs giving guidance for rebalance time and state store config -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931646#comment-16931646 ] Randall Hauch commented on KAFKA-8523: -- It seems strange to me that InsertField into a null (tombstone) record would create an empty map / struct. Rather, it seems like we need a configuration property that defines the behavior on null. However, that means we need a small KIP to define this property. I'd suggest a property like `behavior.on.null` whose values are either `empty` or `skip`. If we consider records with null values as tombstones (meaning they represent deleted records), defaulting to `skip` makes the most sense to me. We do have a similar problem with `ExtractField` (see KAFKA-7052), and it'd be great if the KIP added that configuration item there as well. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8917) When performing a Left/Right-Join, pick the headers of the same side
[ https://issues.apache.org/jira/browse/KAFKA-8917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pascal Büttiker updated KAFKA-8917: --- Description: As described in KAFKA-7718, headers are promoted by the "triggering" record in stateful operations such as Joins. This was very confusing and we spent quite some time debugging this. While we ideally have full control over this behaviour, like the KAFKA-7718 proposes, I hope we can solve some of the randomness before this: * Inner-Join: Keep as is (use the headers of the triggering record) * Full-Join: Keep as is (use the headers of the triggering record) * Left-Join: *Always pick the headers of the left record.* * Right-Join: *Always pick the headers of the right record.* This behaviour would solve the most pressing issues when dealing with headers in Kafka Streams. *Motivation*: In a CDC scenario, we usually have to resolve the relational database joins on our side, which usually means we enrich one record from a couple of other topics. So for a typical CDC use-case, Left-Joins allow the most basic de-normalisations from relational data models. Therefore, when we can solve the header behaviour for left/right joins, we can actually use Kafka Streams in a CDC scenario with joins and headers. We depend on headers, especially when dealing with tombstone records. There is no other way to store additional information. If we do not use tombstone records, all default Kafka Features around compacted topics and KTabels are no longer useable. We are able to use custom Transformers to generate the headers (basically patch in the missing header support in Kafka Streams), but as soon that we use Join/Aggregate we loose control over the headers. was: As described in KAFKA-7718, headers are promoted by the "triggering" record in stateful operations such as Joins. This was very confusing and we spent quite some time debugging this. While we ideally have full control over this behaviour as like the KAFKA-7718 proposes, I hope we can solve some of the randomness before this: * Inner-Join: Keep as is (use the headers of the triggering record) * Full-Join: Keep as is (use the headers of the triggering record) * Left-Join: *Always pick the headers of the left record.* * Right-Join: *Always pick the headers of the right record.* This behaviour would solve the most pressing issues when dealing with headers in Kafka Streams. *Motivation*: In a CDC scenario, we usually have to resolve the relational database joins on our side, which usually means we enrich one record from a couple of other topics. So for a typical CDC use-case, Left-Joins allow the most basic de-normalisations from relational data models. Therefore, when we can solve the header behaviour for left/right joins, we can actually use Kafka Streams in a CDC scenario with joins and headers. We depend on headers, especially when dealing with tombstone records. There is no other way to store additional information. If we do not use tombstone records, all default Kafka Features around compacted topics and KTabels are no longer useable. We are able to use custom Transformers to generate the headers (basically patch in the missing header support in Kafka Streams), but as soon that we use Join/Aggregate we loose control over the headers. > When performing a Left/Right-Join, pick the headers of the same side > > > Key: KAFKA-8917 > URL: https://issues.apache.org/jira/browse/KAFKA-8917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Pascal Büttiker >Priority: Major > > As described in KAFKA-7718, headers are promoted by the "triggering" record > in stateful operations such as Joins. This was very confusing and we spent > quite some time debugging this. > While we ideally have full control over this behaviour, like the KAFKA-7718 > proposes, I hope we can solve some of the randomness before this: > * Inner-Join: Keep as is (use the headers of the triggering record) > * Full-Join: Keep as is (use the headers of the triggering record) > * Left-Join: *Always pick the headers of the left record.* > * Right-Join: *Always pick the headers of the right record.* > This behaviour would solve the most pressing issues when dealing with headers > in Kafka Streams. > *Motivation*: > In a CDC scenario, we usually have to resolve the relational database joins > on our side, which usually means we enrich one record from a couple of other > topics. So for a typical CDC use-case, Left-Joins allow the most basic > de-normalisations from relational data models. Therefore, when we can solve > the header behaviour for left/right joins, we can actually use Kafka Streams > in a CDC scenario with joins and headers.
[jira] [Updated] (KAFKA-8917) When performing a Left/Right-Join, pick the headers of the same side
[ https://issues.apache.org/jira/browse/KAFKA-8917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pascal Büttiker updated KAFKA-8917: --- Description: As described in KAFKA-7718, headers are promoted by the "triggering" record in stateful operations such as Joins. This was very confusing and we spent quite some time debugging this. While we ideally have full control over this behaviour as like the KAFKA-7718 proposes, I hope we can solve some of the randomness before this: * Inner-Join: Keep as is (use the headers of the triggering record) * Full-Join: Keep as is (use the headers of the triggering record) * Left-Join: *Always pick the headers of the left record.* * Right-Join: *Always pick the headers of the right record.* This behaviour would solve the most pressing issues when dealing with headers in Kafka Streams. *Motivation*: In a CDC scenario, we usually have to resolve the relational database joins on our side, which usually means we enrich one record from a couple of other topics. So for a typical CDC use-case, Left-Joins allow the most basic de-normalisations from relational data models. Therefore, when we can solve the header behaviour for left/right joins, we can actually use Kafka Streams in a CDC scenario with joins and headers. We depend on headers, especially when dealing with tombstone records. There is no other way to store additional information. If we do not use tombstone records, all default Kafka Features around compacted topics and KTabels are no longer useable. We are able to use custom Transformers to generate the headers (basically patch in the missing header support in Kafka Streams), but as soon that we use Join/Aggregate we loose control over the headers. was: As described in KAFKA-7718, headers are promoted by the "triggering" record in stateful operations such as Joins. This was very confusing and we spent quite some time debugging this. While we ideally have full control over this behaviour as like the KAFKA-7718 proposes, I hope we can solve some of the randomness before this: * Inner-Join: Keep as is (use the headers of the triggering record) * Full-Join: Keep as is (use the headers of the triggering record) * Left-Join: *Always pick the headers of the left record.* * Right-Join: *Always pick the headers of the right record.* This behaviour would solve the most pressing issues when dealing with headers in Kafka Streams. *Motivation*: We depend on headers, especially when dealing with tombstone records. There is no other way to store additional information. If we do not use tombstone records, all default Kafka Features around compacted topics and KTabels are no longer useable. We are able to use custom Transformers to generate the headers (basically patch in the missing header support in Kafka Streams), but as soon that we use Join/Aggregate we loose control over the headers. > When performing a Left/Right-Join, pick the headers of the same side > > > Key: KAFKA-8917 > URL: https://issues.apache.org/jira/browse/KAFKA-8917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Pascal Büttiker >Priority: Major > > As described in KAFKA-7718, headers are promoted by the "triggering" record > in stateful operations such as Joins. This was very confusing and we spent > quite some time debugging this. > While we ideally have full control over this behaviour as like the KAFKA-7718 > proposes, I hope we can solve some of the randomness before this: > * Inner-Join: Keep as is (use the headers of the triggering record) > * Full-Join: Keep as is (use the headers of the triggering record) > * Left-Join: *Always pick the headers of the left record.* > * Right-Join: *Always pick the headers of the right record.* > This behaviour would solve the most pressing issues when dealing with headers > in Kafka Streams. > *Motivation*: > In a CDC scenario, we usually have to resolve the relational database joins > on our side, which usually means we enrich one record from a couple of other > topics. So for a typical CDC use-case, Left-Joins allow the most basic > de-normalisations from relational data models. Therefore, when we can solve > the header behaviour for left/right joins, we can actually use Kafka Streams > in a CDC scenario with joins and headers. > We depend on headers, especially when dealing with tombstone records. There > is no other way to store additional information. If we do not use tombstone > records, all default Kafka Features around compacted topics and KTabels are > no longer useable. We are able to use custom Transformers to generate the > headers (basically patch in the missing header support in Kafka Streams), but > as soon that we use
[jira] [Created] (KAFKA-8917) When performing a Left/Right-Join, pick the headers of the same side
Pascal Büttiker created KAFKA-8917: -- Summary: When performing a Left/Right-Join, pick the headers of the same side Key: KAFKA-8917 URL: https://issues.apache.org/jira/browse/KAFKA-8917 Project: Kafka Issue Type: Improvement Components: streams Reporter: Pascal Büttiker As described in KAFKA-7718, headers are promoted by the "triggering" record in stateful operations such as Joins. This was very confusing and we spent quite some time debugging this. While we ideally have full control over this behaviour as like the KAFKA-7718 proposes, I hope we can solve some of the randomness before this: * Inner-Join: Keep as is (use the headers of the triggering record) * Full-Join: Keep as is (use the headers of the triggering record) * Left-Join: *Always pick the headers of the left record.* * Right-Join: *Always pick the headers of the right record.* This behaviour would solve the most pressing issues when dealing with headers in Kafka Streams. *Motivation*: We depend on headers, especially when dealing with tombstone records. There is no other way to store additional information. If we do not use tombstone records, all default Kafka Features around compacted topics and KTabels are no longer useable. We are able to use custom Transformers to generate the headers (basically patch in the missing header support in Kafka Streams), but as soon that we use Join/Aggregate we loose control over the headers. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8911) Implicit TimeWindowedSerde creates Serde with null inner serializer
[ https://issues.apache.org/jira/browse/KAFKA-8911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931614#comment-16931614 ] Michał commented on KAFKA-8911: --- The test failure seems a bit random to me but I can not restart it > Implicit TimeWindowedSerde creates Serde with null inner serializer > --- > > Key: KAFKA-8911 > URL: https://issues.apache.org/jira/browse/KAFKA-8911 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Michał >Assignee: Michał >Priority: Major > > {{Serdes.scala}} contains an implicit def timeWindowedSerde as below: > > {code:java} > implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new > WindowedSerdes.TimeWindowedSerde[T]() > {code} > It creates a new {{TimeWindowedSerde}} without inner serializer, which is a > bug. Even in {{WindowedSerdes.java}} it says that empty constructor is for > reflection. > {code:java} > // Default constructor needed for reflection object creation > public TimeWindowedSerde() { > super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>()); > } > public TimeWindowedSerde(final Serde inner) { > super(new TimeWindowedSerializer<>(inner.serializer()), new > TimeWindowedDeserializer<>(inner.deserializer())); > } > {code} > All above failes for me when I try to implicitly access the right Serde: > {code:java} > private val twSerde = implicitly[TimeWindowedSerde[String]] > {code} > and I have to create the object properly on my own > {code} > private val twSerde = new > WindowedSerdes.TimeWindowedSerde[String](implicitly[Serde[String]]) > {code} > it could be fixed with a proper call in {{Serdes.scala}} > {code} > implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): > WindowedSerdes.TimeWindowedSerde[T] = > new WindowedSerdes.TimeWindowedSerde[T](tSerde) > {code} > But maybe also the scope of the default constructor for {{TimeWindowedSerde}} > should be changed? > BR, Michał -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931597#comment-16931597 ] Ryanne Dolan commented on KAFKA-7500: - [~chridtian.hagel] we try to be smart about what config properties are replicated and which are left as defaults: - we have a config property blacklist: https://github.com/apache/kafka/pull/6295/files#diff-1ae39c06c52ded296030121b13d4b791R33 - we don't replicate a config property that is inherited from the cluster default or from the static broker config, i.e. we only replicate properties that are explicitly set for a topic. - we don't replicate read-only or sensitive properties for obvious reasons. cleanup.policy is one that _should_ be replicated, generally, unless you are expecting the default value to be replicated. Also, be advised that config sync is only periodic, with a default interval of 10 minutes. > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8916) Unreliable kafka-reassign-partitions.sh affecting performance
VinayKumar created KAFKA-8916: - Summary: Unreliable kafka-reassign-partitions.sh affecting performance Key: KAFKA-8916 URL: https://issues.apache.org/jira/browse/KAFKA-8916 Project: Kafka Issue Type: Task Components: admin, config Affects Versions: 2.1.1 Environment: CentOS 7 Reporter: VinayKumar Currently I have 3 node kafka cluster, and I want to add 2 more nodes to make it 5 node cluster. *After adding the nodes to cluster, I need all the topic partitions to be evenly distributed across all the 5 nodes. **In the past, when I ran kafka-reassign-partitions.sh & kafka-preferred-replica-election.sh, it ran for very long time, hung & made the cluster unstable. So I'm afraid to use this method. Can you please suggest the best & foolproof way to assign partitions among all the cluster nodes. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8471) Replace control requests/responses with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931533#comment-16931533 ] ASF GitHub Bot commented on KAFKA-8471: --- ijuma commented on pull request #7353: KAFKA-8471: Replace control requests/responses with automated protocol URL: https://github.com/apache/kafka/pull/7353 WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace control requests/responses with automated protocol > -- > > Key: KAFKA-8471 > URL: https://issues.apache.org/jira/browse/KAFKA-8471 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8577) Flaky Test `DistributedHerderTest.testJoinLeaderCatchUpFails`
[ https://issues.apache.org/jira/browse/KAFKA-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931525#comment-16931525 ] Warren Marivel commented on KAFKA-8577: --- Hi, I'm looking at contributing to Kafka and noticed that this test was fixed while searching for tickets to tackle. [https://github.com/apache/kafka/commit/11b25a13ee72e1c91facd0f94b888215f67d5a69] Should this Jira be marked as resolved? Thanks, Warren > Flaky Test `DistributedHerderTest.testJoinLeaderCatchUpFails` > - > > Key: KAFKA-8577 > URL: https://issues.apache.org/jira/browse/KAFKA-8577 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > Started seeing this regularly: > {code:java} > java.lang.AssertionError: > Unexpected method call WorkerGroupMember.maybeLeaveGroup("taking too long > to read the log"): > WorkerGroupMember.ensureActive(): expected: 2, actual: 1 > WorkerGroupMember.wakeup(): expected: 2, actual: 1 > WorkerGroupMember.maybeLeaveGroup("test join leader catch up fails"): > expected: 1, actual: 0 > WorkerGroupMember.requestRejoin(): expected: 1, actual: 0 > WorkerGroupMember.poll(): expected: 1, actual: 0{code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931473#comment-16931473 ] Christian Hagel edited comment on KAFKA-7500 at 9/17/19 1:50 PM: - [~ryannedolan] increasing the number of tasks and playing around with the producer.buffer.memory variable helped a lot to get rid of the above-mentioned error. However, I think I stumbled over a bug with the topic config syncing. When starting the mm2 with the following config: {code:java} { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "", "replication.policy.separator": "", "target.cluster.alias": "B", "source.cluster.bootstrap.servers": "ip:9092", "target.cluster.bootstrap.servers": "ip:9093", "sync.topic.acls.enabled": "false", "replication.factor": "1", "internal.topic.replication.factor": "1", "topics": ".*", "enabled": "true", "rename.topics": "false", "replication.factor": 1, "refresh.topics": "true", "refresh.groups": "true", "sync.topic.configs": "true" } {code} it indeed does create all topics from the source cluster in the target cluster, with the correct number of partitions. Other config parameters like cleanup.policy remain in the cluster default. I thought at first insufficient acls might be the cause, but I replicated the behavior also with a simple docker-compose setup. Did I miss a configuration? Or is this even the intended behavior was (Author: chridtian.hagel): [~ryannedolan] increasing the number of tasks and playing around with the producer.buffer.memory variable helped a lot to get rid of the above-mentioned error. However, I think I stumbled over a bug with the topic config syncing. When starting the mm2 with the following config: {code:java} { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "", "replication.policy.separator": "", "target.cluster.alias": "B", "source.cluster.bootstrap.servers": "ip:9092", "target.cluster.bootstrap.servers": "ip:9093", "sync.topic.acls.enabled": "false", "replication.factor": "1", "internal.topic.replication.factor": "1", "topics": ".*", "enabled": "true", "rename.topics": "false", "replication.factor": 1, "refresh.topics": "true", "refresh.groups": "true", "sync.topic.configs": "true" } {code} it indeed does create all topics from the source cluster in the target cluster, with the correct number of partitions. Other config parameters like cleanup.policy remain in the cluster default. I thought at first insufficient acls might be the cause, but I replicated the behavior also with a simple docker-compose setup. Did I miss a configuration? Or is this even the intendet behavior > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931473#comment-16931473 ] Christian Hagel edited comment on KAFKA-7500 at 9/17/19 1:50 PM: - [~ryannedolan] increasing the number of tasks and playing around with the producer.buffer.memory variable helped a lot to get rid of the above-mentioned error. However, I think I stumbled over a bug with the topic config syncing. When starting the mm2 with the following config: {code:java} { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "", "replication.policy.separator": "", "target.cluster.alias": "B", "source.cluster.bootstrap.servers": "ip:9092", "target.cluster.bootstrap.servers": "ip:9093", "sync.topic.acls.enabled": "false", "replication.factor": "1", "internal.topic.replication.factor": "1", "topics": ".*", "enabled": "true", "rename.topics": "false", "replication.factor": 1, "refresh.topics": "true", "refresh.groups": "true", "sync.topic.configs": "true" } {code} it indeed does create all topics from the source cluster in the target cluster, with the correct number of partitions. Other config parameters like cleanup.policy remain in the cluster default. I thought at first insufficient acls might be the cause, but I replicated the behavior also with a simple docker-compose setup. Did I miss a configuration? Or is this even the intendet behavior was (Author: chridtian.hagel): [~ryannedolan] increasing the number of tasks and playing around with the producer.buffer.memory variable helped a lot to get rid of the above-mentioned error. However, I think I stumbled over a bug with the topic config syncing. When starting the mm2 with the following config: {code:java} { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "", "replication.policy.separator": "", "target.cluster.alias": "B", "source.cluster.bootstrap.servers": "ip:9092", "target.cluster.bootstrap.servers": "ip:9093", "sync.topic.acls.enabled": "false", "replication.factor": "1", "internal.topic.replication.factor": "1", "topics": ".*", "enabled": "true", "rename.topics": "false", "replication.factor": 1, "refresh.topics": "true", "refresh.groups": "true", "sync.topic.configs": "true" } {code} it indeed does create all topics from the source cluster in the target cluster, with the correct number of partitions. Other config parameters like cleanup.policy remain in the cluster default. I thought at first insufficient acls might be the cause, but I replicated the behavior also with a simple docker-compose setup. Did I miss a configuration? > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931473#comment-16931473 ] Christian Hagel commented on KAFKA-7500: [~ryannedolan] increasing the number of tasks and playing around with the producer.buffer.memory variable helped a lot to get rid of the above-mentioned error. However, I think I stumbled over a bug with the topic config syncing. When starting the mm2 with the following config: {code:java} { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "", "replication.policy.separator": "", "target.cluster.alias": "B", "source.cluster.bootstrap.servers": "ip:9092", "target.cluster.bootstrap.servers": "ip:9093", "sync.topic.acls.enabled": "false", "replication.factor": "1", "internal.topic.replication.factor": "1", "topics": ".*", "enabled": "true", "rename.topics": "false", "replication.factor": 1, "refresh.topics": "true", "refresh.groups": "true", "sync.topic.configs": "true" } {code} it indeed does create all topics from the source cluster in the target cluster, with the correct number of partitions. Other config parameters like cleanup.policy remain in the cluster default. I thought at first insufficient acls might be the cause, but I replicated the behavior also with a simple docker-compose setup. Did I miss a configuration? > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931374#comment-16931374 ] Frederic Tardif commented on KAFKA-8523: I believe an option should be exposed to configure the behaviour in order to either pass the null value unmodified or to apply the insert on an empty map. > InsertField transformation fails when encountering tombstone event > -- > > Key: KAFKA-8523 > URL: https://issues.apache.org/jira/browse/KAFKA-8523 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > When applying the {{InsertField}} transformation to a tombstone event, an > exception is raised: > {code} > org.apache.kafka.connect.errors.DataException: Only Map objects supported in > absence of schema for [field insertion], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138) > at > org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131) > at > org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128) > {code} > AFAICS, the transform can still be made working in in this case by simply > building up a new value map from scratch. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8911) Implicit TimeWindowedSerde creates Serde with null inner serializer
[ https://issues.apache.org/jira/browse/KAFKA-8911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931314#comment-16931314 ] ASF GitHub Bot commented on KAFKA-8911: --- atais commented on pull request #7352: KAFKA-8911: Using proper WindowSerdes constructors in their implicit definitions URL: https://github.com/apache/kafka/pull/7352 Detailed info is available in the ticket: https://issues.apache.org/jira/browse/KAFKA-8911 Briefly, `implicit defs` are calling empty constructors, which exists only for reflection object creation. Therefore, while using the implicit definitons, a NPE occurs when Serde is called. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implicit TimeWindowedSerde creates Serde with null inner serializer > --- > > Key: KAFKA-8911 > URL: https://issues.apache.org/jira/browse/KAFKA-8911 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Michał >Assignee: Michał >Priority: Major > > {{Serdes.scala}} contains an implicit def timeWindowedSerde as below: > > {code:java} > implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new > WindowedSerdes.TimeWindowedSerde[T]() > {code} > It creates a new {{TimeWindowedSerde}} without inner serializer, which is a > bug. Even in {{WindowedSerdes.java}} it says that empty constructor is for > reflection. > {code:java} > // Default constructor needed for reflection object creation > public TimeWindowedSerde() { > super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>()); > } > public TimeWindowedSerde(final Serde inner) { > super(new TimeWindowedSerializer<>(inner.serializer()), new > TimeWindowedDeserializer<>(inner.deserializer())); > } > {code} > All above failes for me when I try to implicitly access the right Serde: > {code:java} > private val twSerde = implicitly[TimeWindowedSerde[String]] > {code} > and I have to create the object properly on my own > {code} > private val twSerde = new > WindowedSerdes.TimeWindowedSerde[String](implicitly[Serde[String]]) > {code} > it could be fixed with a proper call in {{Serdes.scala}} > {code} > implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): > WindowedSerdes.TimeWindowedSerde[T] = > new WindowedSerdes.TimeWindowedSerde[T](tSerde) > {code} > But maybe also the scope of the default constructor for {{TimeWindowedSerde}} > should be changed? > BR, Michał -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8915) 无法修改partition
lingyi.zhong created KAFKA-8915: --- Summary: 无法修改partition Key: KAFKA-8915 URL: https://issues.apache.org/jira/browse/KAFKA-8915 Project: Kafka Issue Type: Bug Reporter: lingyi.zhong [root@work1 kafka]# bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 --replication-factor 1 --partitions 1 --topic test_topic3[root@work1 kafka]# bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 --replication-factor 1 --partitions 1 --topic test_topic3 WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.Created topic "test_topic3".[root@work1 kafka]# bin/kafka-topics.sh --alter --zookeeper 10.20.30.78:2181/chroot --partition 2 --topic test_topic3 Exception in thread "main" joptsimple.UnrecognizedOptionException: partition is not a recognized option at joptsimple.OptionException.unrecognizedOption(OptionException.java:108) at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510) at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56) at joptsimple.OptionParser.parse(OptionParser.java:396) at kafka.admin.TopicCommand$TopicCommandOptions.(TopicCommand.scala:358) at kafka.admin.TopicCommand$.main(TopicCommand.scala:44) at kafka.admin.TopicCommand.main(TopicCommand.scala) -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8910) Incorrect javadoc at KafkaProducer.InterceptorCallback#onCompletion
[ https://issues.apache.org/jira/browse/KAFKA-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Ushakov updated KAFKA-8910: -- Description: h1. Problem Javadoc for org.apache.kafka.clients.producer.Callback states: {noformat} * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error *occurred. {noformat} In fact, metadata is never null since KAFKA-6180. See org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion for details. was: h1. Problem Javadoc for org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion states: {noformat} * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error *occurred. {noformat} In fact, metadata is never null since KAFKA-6180. See org.apache.kafka.clients.producer.Callback for details. > Incorrect javadoc at KafkaProducer.InterceptorCallback#onCompletion > --- > > Key: KAFKA-8910 > URL: https://issues.apache.org/jira/browse/KAFKA-8910 > Project: Kafka > Issue Type: Bug >Reporter: Sergey Ushakov >Assignee: Lee Dongjin >Priority: Major > > h1. Problem > Javadoc for org.apache.kafka.clients.producer.Callback states: > {noformat} > * @param metadata The metadata for the record that was sent (i.e. the > partition and offset). Null if an error > *occurred. {noformat} > In fact, metadata is never null since KAFKA-6180. See > org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion > for details. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8584) Allow "bytes" type to generated a ByteBuffer rather than byte arrays
[ https://issues.apache.org/jira/browse/KAFKA-8584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931154#comment-16931154 ] Tom Bentley commented on KAFKA-8584: I for one would like to see this discussed in a KIP, even if it doesn't change the serialized form and therefore isn't really a public API in the strictest sense. > Allow "bytes" type to generated a ByteBuffer rather than byte arrays > > > Key: KAFKA-8584 > URL: https://issues.apache.org/jira/browse/KAFKA-8584 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: newbie > > Right now in the RPC definition, type {{bytes}} would be translated into > {{byte[]}} in generated Java code. However, for some requests like > ProduceRequest#partitionData, the underlying type would better be a > ByteBuffer rather than a byte array. > One proposal is to add an additional boolean tag {{useByteBuffer}} for > {{bytes}} type, which by default is false; when set to {{true}} set the > corresponding field to generate {{ByteBuffer}} instead of {{[]byte}}. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8914) Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error
Igamr Palsenberg created KAFKA-8914: --- Summary: Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error Key: KAFKA-8914 URL: https://issues.apache.org/jira/browse/KAFKA-8914 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.3.0 Reporter: Igamr Palsenberg Setting ConsumerConfig.GROUP_INSTANCE_ID_CONFIG (group.instance.id) to any value (I tried valid numbers and just some string values) will result in : org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowExceptiong.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) Kafka broker : Kafka 2.3.0, installed using Brew. Default config, except the bind IP, that is set explicitly to localhost. -- This message was sent by Atlassian Jira (v8.3.2#803003)