[jira] [Commented] (KAFKA-8914) Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error

2019-09-17 Thread Igamr Palsenberg (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932056#comment-16932056
 ] 

Igamr Palsenberg commented on KAFKA-8914:
-

The broker is 2.3.0 according to brew. I didn't do any configuration in broker 
side, except from explicitly setting the bind adress.

> Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error
> --
>
> Key: KAFKA-8914
> URL: https://issues.apache.org/jira/browse/KAFKA-8914
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Igamr Palsenberg
>Priority: Major
>
>  
> Setting ConsumerConfig.GROUP_INSTANCE_ID_CONFIG (group.instance.id) to any 
> value (I tried valid numbers and just some string values) will result in : 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': 
> java.nio.BufferUnderflowExceptiong.apache.kafka.common.protocol.types.SchemaException:
>  Error reading field 'version': java.nio.BufferUnderflowException at 
> org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  
> Kafka broker : 
> Kafka 2.3.0, installed using Brew. Default config, except the bind IP, that 
> is set explicitly to localhost.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leibo updated KAFKA-8532:
-
Affects Version/s: 2.3.0

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1, 2.3.0
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:27 AM:
---

[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named Expire to 
ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle by ControllerEventThread.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named Expire to 
ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:26 AM:
---

[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named Expire to 
ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:26 AM:
---

[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:24 AM:
---

[~junrao]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo commented on KAFKA-8532:
--

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:25 AM:
---

[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
[~junrao]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Updated] (KAFKA-8915) Unable to modify partition

2019-09-17 Thread Lee Dongjin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8915:
---
Summary: Unable to modify partition  (was: 无法修改partition)

> Unable to modify partition
> --
>
> Key: KAFKA-8915
> URL: https://issues.apache.org/jira/browse/KAFKA-8915
> Project: Kafka
>  Issue Type: Bug
>Reporter: lingyi.zhong
>Priority: Major
>
> [root@work1 kafka]# bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 
> --replication-factor 1 --partitions 1  --topic test_topic3[root@work1 kafka]# 
> bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 
> --replication-factor 1 --partitions 1  --topic test_topic3
> WARNING: Due to limitations in metric names, topics with a period ('.') or 
> underscore ('_') could collide. To avoid issues it is best to use either, but 
> not both.Created topic "test_topic3".[root@work1 kafka]# bin/kafka-topics.sh  
> --alter --zookeeper 10.20.30.78:2181/chroot  --partition 2 --topic test_topic3
> Exception in thread "main" joptsimple.UnrecognizedOptionException: partition 
> is not a recognized option at 
> joptsimple.OptionException.unrecognizedOption(OptionException.java:108) at 
> joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510) at 
> joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56) at 
> joptsimple.OptionParser.parse(OptionParser.java:396) at 
> kafka.admin.TopicCommand$TopicCommandOptions.(TopicCommand.scala:358) 
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:44) at 
> kafka.admin.TopicCommand.main(TopicCommand.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932006#comment-16932006
 ] 

leibo commented on KAFKA-8532:
--

Hi, [~junrao] , the problem is happened again,   the cluster jstack dump log 
were uploaded .

please help to analysis this problem, thx.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 

[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leibo updated KAFKA-8532:
-
Attachment: js2.log

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
>  at 
> 

[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leibo updated KAFKA-8532:
-
Attachment: js1.log

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
>  at 
> 

[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leibo updated KAFKA-8532:
-
Attachment: js0.log

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
>  at 
> 

[jira] [Commented] (KAFKA-8104) Consumer cannot rejoin to the group after rebalancing

2019-09-17 Thread Charles Qian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931990#comment-16931990
 ] 

Charles Qian commented on KAFKA-8104:
-

same issue.   kafka-client 2.2.0

it happens in large topology. like each topic has > 16 partitions. so that for 
each topic, client needs > 16 consumer threads.

 

 

 

> Consumer cannot rejoin to the group after rebalancing
> -
>
> Key: KAFKA-8104
> URL: https://issues.apache.org/jira/browse/KAFKA-8104
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0
>Reporter: Gregory Koshelev
>Priority: Critical
> Attachments: consumer-rejoin-fail.log
>
>
> TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent 
> {{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and 
> {{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). 
> See explanation below.
> There are 16 consumers in single process (threads from pool-4-thread-1 to 
> pool-4-thread-16). All of them belong to single consumer group 
> {{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired 
> and consumers have got {{CommitFailedException}} as expected:
> {noformat}
> 2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN  
> r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured max.poll.interval.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298)
>   at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156)
>   at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}
> After that, most of them successfully rejoined to the group with generation 
> 10699:
> {noformat}
> 2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
> with generation 10699
> 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-18]
> ...
> 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
> with generation 10699
> 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11]
> ...
> 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-24]
> 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
> since group is rebalancing
> 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
> since group is rebalancing
> 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer 

[jira] [Commented] (KAFKA-8086) Flaky Test GroupAuthorizerIntegrationTest#testPatternSubscriptionWithTopicAndGroupRead

2019-09-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931950#comment-16931950
 ] 

ASF GitHub Bot commented on KAFKA-8086:
---

guozhangwang commented on pull request #7356: KAFKA-8086: Use 1 partition for 
offset topic when possible
URL: https://github.com/apache/kafka/pull/7356
 
 
   I realized some flaky tests failed at `setup` or calls that tries to create 
offset topics, and I think using one partition and one replica would be 
sufficient in these cases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test 
> GroupAuthorizerIntegrationTest#testPatternSubscriptionWithTopicAndGroupRead
> --
>
> Key: KAFKA-8086
> URL: https://issues.apache.org/jira/browse/KAFKA-8086
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/62/testReport/junit/kafka.api/GroupAuthorizerIntegrationTest/testPatternSubscriptionWithTopicAndGroupRead/]
> {quote}java.lang.AssertionError: Partition [__consumer_offsets,0] metadata 
> not propagated after 15000 ms at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:381) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791) at 
> kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:880) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$3(TestUtils.scala:318) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$3$adapted(TestUtils.scala:317) at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at 
> scala.collection.immutable.Range.foreach(Range.scala:158) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:237) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:230) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
> kafka.utils.TestUtils$.createTopic(TestUtils.scala:317) at 
> kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:375) at 
> kafka.api.AuthorizerIntegrationTest.setUp(AuthorizerIntegrationTest.scala:242){quote}
> STDOUT
> {quote}[2019-03-09 08:40:34,220] ERROR [KafkaApi-0] Error when handling 
> request: clientId=0, correlationId=0, api=UPDATE_METADATA, 
> body=\{controller_id=0,controller_epoch=1,broker_epoch=25,topic_states=[],live_brokers=[{id=0,end_points=[{port=41020,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
>  (kafka.server.KafkaApis:76) 
> org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
> Request(processor=0, connectionId=127.0.0.1:41020-127.0.0.1:52304-0, 
> session=Session(Group:testGroup,/127.0.0.1), 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> buffer=null) is not authorized. [2019-03-09 08:40:35,336] ERROR [Consumer 
> clientId=consumer-98, groupId=my-group] Offset commit failed on partition 
> topic-0 at offset 5: Not authorized to access topics: [Topic authorization 
> failed.] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:812) 
> [2019-03-09 08:40:35,336] ERROR [Consumer clientId=consumer-98, 
> groupId=my-group] Not authorized to commit to topics [topic] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:850) 
> [2019-03-09 08:40:41,649] ERROR [KafkaApi-0] Error when handling request: 
> clientId=0, correlationId=0, api=UPDATE_METADATA, 
> body=\{controller_id=0,controller_epoch=1,broker_epoch=25,topic_states=[],live_brokers=[{id=0,end_points=[{port=36903,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
>  (kafka.server.KafkaApis:76) 
> org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
> Request(processor=0, connectionId=127.0.0.1:36903-127.0.0.1:44978-0, 
> session=Session(Group:testGroup,/127.0.0.1), 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> buffer=null) is not authorized. [2019-03-09 08:40:53,898] ERROR [KafkaApi-0] 
> Error when handling request: clientId=0, correlationId=0, 
> api=UPDATE_METADATA, 
> 

[jira] [Resolved] (KAFKA-8919) Flaky Test kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess

2019-09-17 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8919.
--
Resolution: Fixed

Realize it is fixed as part of polling (1L) in 
https://github.com/apache/kafka/pull/7312

> Flaky Test 
> kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess
> 
>
> Key: KAFKA-8919
> URL: https://issues.apache.org/jira/browse/KAFKA-8919
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
>
> {code}
> Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:830)
>   at kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:795)
>   at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1325)
>   at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1333)
>   at 
> kafka.api.AuthorizerIntegrationTest.consumeRecords(AuthorizerIntegrationTest.scala:1772)
>   at 
> kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess(AuthorizerIntegrationTest.scala:813)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at jdk.internal.reflect.GeneratedMethodAccessor110.invoke(Unknown 
> Source)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> 

[jira] [Commented] (KAFKA-8913) Document topic based configs & ISR settings for Streams apps

2019-09-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931936#comment-16931936
 ] 

ASF GitHub Bot commented on KAFKA-8913:
---

mjsax commented on pull request #7346: KAFKA-8913: Document topic based configs 
& ISR settings for Streams apps
URL: https://github.com/apache/kafka/pull/7346
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document topic based configs & ISR settings for Streams apps
> 
>
> Key: KAFKA-8913
> URL: https://issues.apache.org/jira/browse/KAFKA-8913
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>
> Noticed that it was not clear how to configure the internal topics . 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2019-09-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931931#comment-16931931
 ] 

Matthias J. Sax commented on KAFKA-7499:


Alaa, thanks for looking into this. I'll reply on the mailing list – the 
discussion is supposed to happen there, and not on the ticket.

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Alaa Zbair
>Priority: Major
>  Labels: beginner, kip, newbie
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8595) Support SerDe of Decimals in JSON that are not HEX encoded

2019-09-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931926#comment-16931926
 ] 

ASF GitHub Bot commented on KAFKA-8595:
---

agavra commented on pull request #7354: KAFKA-8595: Support deserialization of 
decimals encoded in NUMERIC fo…
URL: https://github.com/apache/kafka/pull/7354
 
 
   see 
[KIP-481](https://cwiki.apache.org/confluence/display/KAFKA/KIP-481%3A+SerDe+Improvements+for+Connect+Decimal+type+in+JSON)
 for more details
   
   Review Guide:
   - Added `DecimalFormat` enum to represent different JSON decimal node types 
and corresponding config in `JsonConverterConfig`
   - Split `LogicalTypeConverter` to have two strongly typed methods (`toJson` 
and `toConnect`) so that we could group SerDe methods together (this caused 
some code to move around because I could combine `TO_JSON_LOGICAL_CONVERTERS` 
and `TO_CONNECT_LOGICAL_CONVERTERS`)
   - Implemented the SerDe in the line containing 
`LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter()`
   - Added `CaseInsensitiveValidString` to the validators
   
   Unit tests covering all of the formats are added.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SerDe of Decimals in JSON that are not HEX encoded
> --
>
> Key: KAFKA-8595
> URL: https://issues.apache.org/jira/browse/KAFKA-8595
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Major
>
> Most JSON data that utilizes precise decimal data represents it as a decimal 
> string. Kafka Connect, on the other hand, only supports a binary HEX string 
> encoding (see example below). We should support deserialization and 
> serialization for any of the following types:
> {code:java}
> {
>   "asHex": "D3J5",
>   "asString": "10.12345"
>   "asNumber": 10.2345
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8919) Flaky Test kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess

2019-09-17 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-8919:


 Summary: Flaky Test 
kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess
 Key: KAFKA-8919
 URL: https://issues.apache.org/jira/browse/KAFKA-8919
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Reporter: Guozhang Wang


{code}
Stacktrace
org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 1 records
at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
at org.scalatest.Assertions.fail(Assertions.scala:1091)
at org.scalatest.Assertions.fail$(Assertions.scala:1087)
at org.scalatest.Assertions$.fail(Assertions.scala:1389)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:830)
at kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:795)
at 
kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1325)
at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1333)
at 
kafka.api.AuthorizerIntegrationTest.consumeRecords(AuthorizerIntegrationTest.scala:1772)
at 
kafka.api.AuthorizerIntegrationTest.testSimpleConsumeWithOffsetLookupAndNoGroupAccess(AuthorizerIntegrationTest.scala:813)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at jdk.internal.reflect.GeneratedMethodAccessor110.invoke(Unknown 
Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at 

[jira] [Updated] (KAFKA-8918) Flaky Test org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation

2019-09-17 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8918:
-
Labels: flaky-test  (was: )

> Flaky Test 
> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation
> 
>
> Key: KAFKA-8918
> URL: https://issues.apache.org/jira/browse/KAFKA-8918
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 15000. Timed out 
> waiting for expected tasks 
> {"foo":{"id":"foo","taskState":{"state":"DONE","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"cancelled":true,"status":{"node01":"done","node02":"done"}},"workerState":{"state":"DONE","taskId":"foo","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"status":"done"}}}
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336)
>   at 
> org.apache.kafka.trogdor.common.ExpectedTasks.waitFor(ExpectedTasks.java:144)
>   at 
> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation(CoordinatorTest.java:264)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8575) Investigate cleaning up task suspension (part 8)

2019-09-17 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931832#comment-16931832
 ] 

Sophie Blee-Goldman commented on KAFKA-8575:


Partly tackled by the first PR for KAFKA-8510 – removes suspension of standby 
tasks (for all protocols). Active tasks that are revoked will first go through 
the process of being suspended and then closed if not reassigned, if we decide 
to remove eager  rebalancing we can further clean that up to just close them 
and get rid of all suspension

> Investigate cleaning up task suspension (part 8)
> 
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (KAFKA-5635) KIP-181 Kafka-Connect integrate with kafka ReST Proxy

2019-09-17 Thread Ewen Cheslack-Postava (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava closed KAFKA-5635.


> KIP-181 Kafka-Connect integrate with kafka ReST Proxy
> -
>
> Key: KAFKA-5635
> URL: https://issues.apache.org/jira/browse/KAFKA-5635
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Dhananjay Patkar
>Priority: Major
>  Labels: features, newbie
>
> Kafka connect currently uses kafka clients which directly connect to kafka 
> brokers. 
> In a use case, wherein I have many kafka connect [producers] running remotely 
> its a challenge to configure broker information on every connect agent.
> Also, in case of IP change [upgrade or cluster re-creation], we need to 
> update every remote connect configuration.
> If kafka connect source connectors talk to ReST endpoint then client is 
> unaware of broker details. This way we can transparently upgrade / re-create 
> kafka cluster as long as ReST endpoint remains same.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (KAFKA-8603) Document upgrade path

2019-09-17 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8603:
--

Assignee: Sophie Blee-Goldman

> Document upgrade path
> -
>
> Key: KAFKA-8603
> URL: https://issues.apache.org/jira/browse/KAFKA-8603
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Users need to follow a specific upgrade path in order to smoothly and safely 
> perform live upgrade. We should very clearly document the steps needed to 
> upgrade a Consumer and a Streams app (note they will be different)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8918) Flaky Test org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation

2019-09-17 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8918:
-
Component/s: core

> Flaky Test 
> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation
> 
>
> Key: KAFKA-8918
> URL: https://issues.apache.org/jira/browse/KAFKA-8918
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 15000. Timed out 
> waiting for expected tasks 
> {"foo":{"id":"foo","taskState":{"state":"DONE","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"cancelled":true,"status":{"node01":"done","node02":"done"}},"workerState":{"state":"DONE","taskId":"foo","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"status":"done"}}}
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336)
>   at 
> org.apache.kafka.trogdor.common.ExpectedTasks.waitFor(ExpectedTasks.java:144)
>   at 
> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation(CoordinatorTest.java:264)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (KAFKA-8902) Benchmark cooperative vs eager rebalancing

2019-09-17 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-8902:
---

Assignee: John Roesler

> Benchmark cooperative vs eager rebalancing
> --
>
> Key: KAFKA-8902
> URL: https://issues.apache.org/jira/browse/KAFKA-8902
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Assignee: John Roesler
>Priority: Major
>
> Cause rebalance and measure:
> * overall throughput
> * paused time
> * (also look at the metrics from 
> (https://issues.apache.org/jira/browse/KAFKA-8609)):
> ** accumulated rebalance time
> Cluster/topic sizing:
> ** 10 instances
> ** 100 tasks (each instance gets 10 tasks)
> ** 1000 stores (each task gets 10 stores)
> * standbys = [0 and 1]
> Rolling bounce:
> * with and without state loss
> * shorter and faster than session timeout (shorter in particular should be 
> interesting)
> Expand (from 9 to 10)
> Contract (from 10 to 9)
> With and without saturation:
> EOS:
> * with and without
> Topology:
> * stateful
> * windowed agg
> Key Parameterizations:
> 1. control: no rebalances
> 2. rolling without state loss faster than session timeout
> 3. expand 9 to 10
> 4. contract 10 to 9



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8902) Benchmark cooperative vs eager rebalancing

2019-09-17 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8902:

Description: 
Cause rebalance and measure:
* overall throughput
* paused time
* (also look at the metrics from 
(https://issues.apache.org/jira/browse/KAFKA-8609)):
** accumulated rebalance time

Cluster/topic sizing:
** 10 instances
** 100 tasks (each instance gets 10 tasks)
** 1000 stores (each task gets 10 stores)
* standbys = [0 and 1]

Rolling bounce:
* with and without state loss
* shorter and faster than session timeout (shorter in particular should be 
interesting)

Expand (from 9 to 10)

Contract (from 10 to 9)

With and without saturation:

EOS:
* with and without

Topology:
* stateful
* windowed agg

Key Parameterizations:
1. control: no rebalances
2. rolling without state loss faster than session timeout
3. expand 9 to 10
4. contract 10 to 9



> Benchmark cooperative vs eager rebalancing
> --
>
> Key: KAFKA-8902
> URL: https://issues.apache.org/jira/browse/KAFKA-8902
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Cause rebalance and measure:
> * overall throughput
> * paused time
> * (also look at the metrics from 
> (https://issues.apache.org/jira/browse/KAFKA-8609)):
> ** accumulated rebalance time
> Cluster/topic sizing:
> ** 10 instances
> ** 100 tasks (each instance gets 10 tasks)
> ** 1000 stores (each task gets 10 stores)
> * standbys = [0 and 1]
> Rolling bounce:
> * with and without state loss
> * shorter and faster than session timeout (shorter in particular should be 
> interesting)
> Expand (from 9 to 10)
> Contract (from 10 to 9)
> With and without saturation:
> EOS:
> * with and without
> Topology:
> * stateful
> * windowed agg
> Key Parameterizations:
> 1. control: no rebalances
> 2. rolling without state loss faster than session timeout
> 3. expand 9 to 10
> 4. contract 10 to 9



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (KAFKA-8496) Add system test for compatibility and upgrade path (part 6)

2019-09-17 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-8496:
---

Assignee: Bill Bejeck

> Add system test for compatibility and upgrade path (part 6)
> ---
>
> Key: KAFKA-8496
> URL: https://issues.apache.org/jira/browse/KAFKA-8496
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Bill Bejeck
>Priority: Major
>
> tests:
> * compatibility
> * upgrade (follow the existing upgrade pattern)
> * make sure we don't get stuck in rebalance loops (eventually get to some 
> agreed generation, verify we don't trigger more rebalances after agreed 
> generation)
> acceptance:
> * verify the partition assignment is correct
> * verify that Streams returns to processing



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8496) Add system test for compatibility and upgrade path (part 6)

2019-09-17 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8496:

Description: 
tests:
* compatibility
* upgrade (follow the existing upgrade pattern)
* make sure we don't get stuck in rebalance loops (eventually get to some 
agreed generation, verify we don't trigger more rebalances after agreed 
generation)

acceptance:
* verify the partition assignment is correct
* verify that Streams returns to processing

> Add system test for compatibility and upgrade path (part 6)
> ---
>
> Key: KAFKA-8496
> URL: https://issues.apache.org/jira/browse/KAFKA-8496
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Priority: Major
>
> tests:
> * compatibility
> * upgrade (follow the existing upgrade pattern)
> * make sure we don't get stuck in rebalance loops (eventually get to some 
> agreed generation, verify we don't trigger more rebalances after agreed 
> generation)
> acceptance:
> * verify the partition assignment is correct
> * verify that Streams returns to processing



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (KAFKA-8609) Add consumer metrics for rebalances (part 9)

2019-09-17 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-8609:
---

Assignee: Guozhang Wang  (was: Sophie Blee-Goldman)

> Add consumer metrics for rebalances (part 9)
> 
>
> Key: KAFKA-8609
> URL: https://issues.apache.org/jira/browse/KAFKA-8609
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Major
>
> We would like to track some additional metrics on the consumer side related 
> to rebalancing as part of this KIP, including
>  # listener callback latency
>  ## partitions-revoked-time-avg
>  ## partitions-revoked-time-max
>  ## partitions-assigned-time-avg
>  ## partitions-assigned-time-max
>  ## partitions-lost-time-avg
>  ## partitions-lost-time-max
>  # rebalance rate (# rebalances per day)
>  ## rebalance-rate-per-day



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Frederic Tardif (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931777#comment-16931777
 ] 

Frederic Tardif commented on KAFKA-8523:


just to clarify my comment above. The `behavior.on.null.values` already exists 
(at least on the elasticsearch connector), so it is important to keep the null 
values when we aim to interpret it as `delete` on an elasticsearch index entry. 
Applying a transform that InsertFields on null would completely defies the 
purpose in this scenario. I strongly believe the transform should skip null 
values or this transform behaviour should at least be configurable.  

 

!image-2019-09-17-15-53-44-038.png!

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
> Attachments: image-2019-09-17-15-53-44-038.png
>
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Frederic Tardif (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931777#comment-16931777
 ] 

Frederic Tardif edited comment on KAFKA-8523 at 9/17/19 7:58 PM:
-

just to clarify my comment above. The `behavior.on.null.values` already exists 
(at least on the elasticsearch connector), so it is important to keep the null 
values when we aim to interpret it as `delete` on an elasticsearch index entry. 
Applying a transform that InsertFields on null would completely defies the 
purpose in this scenario. I strongly believe the transform should skip null 
values or this transform behaviour should at least be configurable.  

 

!image-2019-09-17-15-53-44-038.png|width=693,height=130!


was (Author: frederic.tardif):
just to clarify my comment above. The `behavior.on.null.values` already exists 
(at least on the elasticsearch connector), so it is important to keep the null 
values when we aim to interpret it as `delete` on an elasticsearch index entry. 
Applying a transform that InsertFields on null would completely defies the 
purpose in this scenario. I strongly believe the transform should skip null 
values or this transform behaviour should at least be configurable.  

 

!image-2019-09-17-15-53-44-038.png!

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
> Attachments: image-2019-09-17-15-53-44-038.png
>
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-09-17 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931776#comment-16931776
 ] 

Ryanne Dolan commented on KAFKA-7500:
-

[~qihong] That's all correct :)

In fact, KIP-382 mentions a "primary" cluster (in your case "C") and a single 
Connect cluster being used to replicate data between any number of other 
clusters. One way to do this is with a SinkConnector (part of the KIP and 
coming later) s.t. records travel _through_ the primary cluster to other 
clusters. I believe you could also override the Connect Worker's internal 
Producer config to write directly to another cluster, s.t., as you say, state 
is stored in one cluster but records go to another. I've never tried that, 
ymmv, but I suspect it'd work as you've described.

Notice that, generally, you will end up with multiple Connect clusters anyway 
-- one in each DC -- for performance and HA reasons. At that point you are back 
to managing multiple Connectors on multiple Connect clusters. MM2's top-level 
driver manages that complexity for you by automatically spinning up and 
configuring all the required Workers.

re a REST API, the MM2 driver essentially turns off the REST service inside 
each Herder. This is because the current Connect REST API doesn't support 
having multiple Herders or Worker configs, so we'd need to sorta abuse the 
Connect REST API to get it to work. However, there was much discussion around 
KIP-382 re an MM2 REST API, and there are several good ideas floating around. 
These were ultimately deferred, but not ruled out.

Also, fwiw, I have successfully run an MM2 cluster with the Connect REST API 
turned on, with each Herder's endpoints wrapped in a higher-level API. I have 
done this successfully with a reverse proxy as well as with a fork of Connect's 
RestServer. This enables you to start/stop/configure individual connectors 
within the MM2 cluster, if you so wish. 

And finally: MM2 is very pluggable. For example, you can drop in a TopicFilter 
that grabs dynamic whitelists from somewhere. I happen to know this works very 
well :)

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Manikumar
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Frederic Tardif (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederic Tardif updated KAFKA-8523:
---
Attachment: image-2019-09-17-15-53-44-038.png

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
> Attachments: image-2019-09-17-15-53-44-038.png
>
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8912) TimeoutException cause is insufficiently documented for the KafkaProducer

2019-09-17 Thread Ramkishan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramkishan updated KAFKA-8912:
-
Affects Version/s: 2.3.0

> TimeoutException cause is insufficiently documented for the KafkaProducer
> -
>
> Key: KAFKA-8912
> URL: https://issues.apache.org/jira/browse/KAFKA-8912
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, producer 
>Affects Versions: 1.0.1, 2.3.0
>Reporter: Ramkishan
>Priority: Major
>
> The javadocs of the org.apache.kafka.clients.producer.KafkaProducer class has 
> partially articulated description for the TimeoutException, wherever 
> applicable.
> The document reads - 
> "{{[TimeoutException|https://kafka.apache.org/23/javadoc/org/apache/kafka/common/errors/TimeoutException.html]}}
>  - If the time taken for fetching metadata or allocating memory for the 
> record has surpassed {{max.block.ms}}."
> While we are aware that this exception can also be thrown if the RTM occurs 
> when the message is on the accumulator and the batch expires. The current 
> description is misleading the developers who try to optimize the producer 
> config and if they face this issue
> [https://kafka.apache.org/23/javadoc/org/apache/kafka/common/errors/TimeoutException.html]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8912) TimeoutException cause is insufficiently documented for the KafkaProducer

2019-09-17 Thread Ramkishan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramkishan updated KAFKA-8912:
-
Affects Version/s: (was: 2.2.1)
   (was: 0.10.0.1)
   1.0.1

> TimeoutException cause is insufficiently documented for the KafkaProducer
> -
>
> Key: KAFKA-8912
> URL: https://issues.apache.org/jira/browse/KAFKA-8912
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, producer 
>Affects Versions: 1.0.1
>Reporter: Ramkishan
>Priority: Major
>
> The javadocs of the org.apache.kafka.clients.producer.KafkaProducer class has 
> partially articulated description for the TimeoutException, wherever 
> applicable.
> The document reads - 
> "{{[TimeoutException|https://kafka.apache.org/23/javadoc/org/apache/kafka/common/errors/TimeoutException.html]}}
>  - If the time taken for fetching metadata or allocating memory for the 
> record has surpassed {{max.block.ms}}."
> While we are aware that this exception can also be thrown if the RTM occurs 
> when the message is on the accumulator and the batch expires. The current 
> description is misleading the developers who try to optimize the producer 
> config and if they face this issue
> [https://kafka.apache.org/23/javadoc/org/apache/kafka/common/errors/TimeoutException.html]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931761#comment-16931761
 ] 

Randall Hauch commented on KAFKA-8523:
--

[~gunnar.morling], good point. I think a tombstone should be left unmodified by 
this SMT, which means no need for a KIP or a config change.

I think converting or dropping a tombstone are separate SMTs.

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931761#comment-16931761
 ] 

Randall Hauch edited comment on KAFKA-8523 at 9/17/19 7:32 PM:
---

[~gunnar.morling], good point. I think a tombstone should be left unmodified by 
this SMT, which means no need for a KIP or a config change.

I think dropping a tombstone or replacing a tombstone with a non-null value are 
separate SMTs.


was (Author: rhauch):
[~gunnar.morling], good point. I think a tombstone should be left unmodified by 
this SMT, which means no need for a KIP or a config change.

I think converting or dropping a tombstone are separate SMTs.

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-09-17 Thread Qihong Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931743#comment-16931743
 ] 

Qihong Chen edited comment on KAFKA-7500 at 9/17/19 7:22 PM:
-

Hi [~ryannedolan], just listened your Kafka Power Chat talk, Thanks!

Have a follow up question about the last question (from me) you answered in the 
talk. You said you prefer dedicated MM2 cluster over running MM2 in connect 
cluster since you can use less number of clusters to do replications among 
multiple Kafka clusters.

But there's no REST Api for a dedicated MM2 cluster that can provide the status 
of the replication streams, nor updating the replication configuration. Any 
changes to the configuration meaning update the config files and restart all 
MM2 instances, is that right? Or I missed it that dedicated MM2 cluster does 
provide REST API for admin and monitoring, if so, where is it?

If my understanding is correct, we can archive the same thing with MM2 in 
connect cluster. Assume there are 3 Kafka clusters: A, B, and C. Set up a 
connect cluster against C (meaning all topics for connectors' data and states 
go to cluster C), then set up MM2 connectors to replicate data and metadata  A 
-> B and B -> A. If this is correct, we can use the Kafka cluster C plus the 
connect cluster that running against Kafka cluster C to replicate data among 
more Kafka clusters, like A, B, and D, and even more. Of course, this needs 
more complicated configuration, which requires deeper understanding how the MM2 
connectors work. In this scenario, the connect cluster provides REST API to 
admin and monitoring all the connectors. This will be useful for people can't 
use Stream Replication Manager from Cloudera or Kafka replicator from Confluent 
for some reason. Is this right?


was (Author: qihong):
Hi [~ryannedolan], just listened your Kafka Power Chat talk, Thanks!

Have a follow up question about the last question (from me) you answered in the 
talk. You said you prefer dedicated MM2 cluster over running MM2 in connect 
cluster since you can use less number of clusters to do replications among 
multiple Kafka clusters.

But there's no REST Api for a dedicated MM2 cluster that can provide the status 
of the replication streams, nor updating the replication configuration. Any 
changes to the configuration meaning update the config files and restart all 
MM2 instances, is that right? Or I missed it that dedicated MM2 cluster does 
provide REST API for admin and monitoring, if so, where is it?

If my understanding is correct, we can archive the same thing with MM2 in 
connect cluster. Assume there are 3 Kafka clusters: A, B, and C. Set up a 
connect cluster against C (meaning all topics for connectors' data and states 
go to cluster C), then set up MM2 connectors to replicate data and metadata  A 
-> B and B -> A. If this is correct, we can use the Kafka cluster C plus the 
connect cluster that running against Kafka cluster C to replicate data among 
more Kafka clusters, like A, B, and D, and even more. Of course, this needs 
more complicated configuration, which requires deeper understanding how the MM2 
connectors work. In this scenario, the connect cluster provides REST API to 
admin and monitoring all the connectors. This will be useful for people can't 
use Stream Replication Manager from Cloudera or Kafka replicator from Confluent 
for some reason. Is this right?

 

 

 

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Manikumar
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB

2019-09-17 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931754#comment-16931754
 ] 

Sophie Blee-Goldman commented on KAFKA-8897:


I suppose we could do a warning in #setCompactionOptionsFIFO no matter what ttl 
was set to, in case they did for some reason call setTtl(0). Not sure how much 
we care about supporting no-op methods though

> Increase Version of RocksDB
> ---
>
> Key: KAFKA-8897
> URL: https://issues.apache.org/jira/browse/KAFKA-8897
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> A higher version (6+) of RocksDB is needed for some metrics specified in 
> KIP-471. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-09-17 Thread Qihong Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931743#comment-16931743
 ] 

Qihong Chen commented on KAFKA-7500:


Hi [~ryannedolan], just listened your Kafka Power Chat talk, Thanks!

Have a follow up question about the last question (from me) you answered in the 
talk. You said you prefer dedicated MM2 cluster over running MM2 in connect 
cluster since you can use less number of clusters to do replications among 
multiple Kafka clusters.

But there's no REST Api for a dedicated MM2 cluster that can provide the status 
of the replication streams, nor updating the replication configuration. Any 
changes to the configuration meaning update the config files and restart all 
MM2 instances, is that right? Or I missed it that dedicated MM2 cluster does 
provide REST API for admin and monitoring, if so, where is it?

If my understanding is correct, we can archive the same thing with MM2 in 
connect cluster. Assume there are 3 Kafka clusters: A, B, and C. Set up a 
connect cluster against C (meaning all topics for connectors' data and states 
go to cluster C), then set up MM2 connectors to replicate data and metadata  A 
-> B and B -> A. If this is correct, we can use the Kafka cluster C plus the 
connect cluster that running against Kafka cluster C to replicate data among 
more Kafka clusters, like A, B, and D, and even more. Of course, this needs 
more complicated configuration, which requires deeper understanding how the MM2 
connectors work. In this scenario, the connect cluster provides REST API to 
admin and monitoring all the connectors. This will be useful for people can't 
use Stream Replication Manager from Cloudera or Kafka replicator from Confluent 
for some reason. Is this right?

 

 

 

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Manikumar
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB

2019-09-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931733#comment-16931733
 ] 

Matthias J. Sax commented on KAFKA-8897:


Might be good enough – not 100% safe as one _could_ call `setTtl(0)` though, 
too.

> Increase Version of RocksDB
> ---
>
> Key: KAFKA-8897
> URL: https://issues.apache.org/jira/browse/KAFKA-8897
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> A higher version (6+) of RocksDB is needed for some metrics specified in 
> KIP-471. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB

2019-09-17 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931712#comment-16931712
 ] 

Sophie Blee-Goldman commented on KAFKA-8897:


We could intercept the CompactionOptionsFIFO object in 
RocksDBAdapterClassWithLongName#setCompactionOptionsFIFO and get its ttl, if 
it's anything other than 0 (default is disabled) we know they've called the 
removed method. 

> Increase Version of RocksDB
> ---
>
> Key: KAFKA-8897
> URL: https://issues.apache.org/jira/browse/KAFKA-8897
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> A higher version (6+) of RocksDB is needed for some metrics specified in 
> KIP-471. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins

2019-09-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931696#comment-16931696
 ] 

Matthias J. Sax commented on KAFKA-8377:


{quote}1. Does this mean that every stateful operation on a KTable must be 
materialized?
{quote}
No. We only need to materialize the result of a KTable operation, iff, the next 
operation is a stateful transformValues.
{quote}2. If that's the case should the user be notified that they need to use 
a materialized store for such operations e.g. after the topology optimization 
we can suggest that a materialized store needs to be created. I'm not a 100% 
sure if we must force create a StateStore (since users may want to pass 
specific configurations to the statestore)
{quote}
The user does not need to create a store IMHO – we can just do this internally. 
However, to be able to materialize the result, we need to know the 
corresponding `Serdes` – not sure if we need to do anything special about it... 
But for some cases, if we force materialization we might not have the correct 
`Serdes` and thus a user would need to specify them upstream via `Materialized` 
to avoid runtime errors (if they don't do this and hit a runtime error, it 
might be hard for users to understand the problem...)
{quote}3. Is it also possible that's users are materializing the state on REDIS 
or some other caching mechanism
{quote}
Theoretically yes. But we don't need to worry about this case. If a user plugs 
in a custom store, they would need to specify this upstream anyway forcing a 
materialization explicitly.

> KTable#transformValue might lead to incorrect result in joins
> -
>
> Key: KAFKA-8377
> URL: https://issues.apache.org/jira/browse/KAFKA-8377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Aishwarya Pradeep Kumar
>Priority: Major
>  Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If 
> a non-materialized KTable is input to a join, the lookup into the table 
> results in a lookup of the parents table plus a call to the operator. For 
> example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed 
> as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, 
> #transformValues() might have an attached state store. Hence, when an input 
> record r is processed by #transformValues() with current state S, it might 
> produce an output record r' (that is not materialized). When the join later 
> does a lookup to get r from the parent table, there is no guarantee that 
> #transformValues() again produces r' because its state might not be the same 
> any longer.
> Hence, it seems to be required, to always materialize the result of a 
> KTable#transformValues() operation if there is state. Note, that if there 
> would be a consecutive filter() after tranformValue(), it would also be ok to 
> materialize the filter() result. Furthermore, if there is no downstream 
> join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful 
> #transformValues()` operator.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8914) Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error

2019-09-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931677#comment-16931677
 ] 

Matthias J. Sax commented on KAFKA-8914:


Are your brokers on 2.3.0 version? Using static group membership does not work 
with older brokers.

> Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error
> --
>
> Key: KAFKA-8914
> URL: https://issues.apache.org/jira/browse/KAFKA-8914
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Igamr Palsenberg
>Priority: Major
>
>  
> Setting ConsumerConfig.GROUP_INSTANCE_ID_CONFIG (group.instance.id) to any 
> value (I tried valid numbers and just some string values) will result in : 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': 
> java.nio.BufferUnderflowExceptiong.apache.kafka.common.protocol.types.SchemaException:
>  Error reading field 'version': java.nio.BufferUnderflowException at 
> org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  
> Kafka broker : 
> Kafka 2.3.0, installed using Brew. Default config, except the bind IP, that 
> is set explicitly to localhost.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2019-09-17 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931671#comment-16931671
 ] 

Gunnar Morling commented on KAFKA-7052:
---

I'm wondering: is it solely a question of having a more meaningful exception, 
or should rather be null returned in this case. Seems like one might want 
either, depending on the use case.

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8911) Implicit TimeWindowedSerde creates Serde with null inner serializer

2019-09-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931670#comment-16931670
 ] 

Matthias J. Sax commented on KAFKA-8911:


Thanks for the PR [~atais] – you can re-trigger a build by adding a comment on 
the PR that contains the phrase "retest this please". I would hold off for now 
to rerun the tests until you got a review though – if you need to update the PR 
each `push` will trigger retesting anyway – Jenkins is often overloaded anyway 
and we try to save cycles if we can.

 Btw: please ask question directly on the PR instead of the Jira ticket – 
that's easier for us. Thanks a lot.

> Implicit TimeWindowedSerde creates Serde with null inner serializer
> ---
>
> Key: KAFKA-8911
> URL: https://issues.apache.org/jira/browse/KAFKA-8911
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>
> {{Serdes.scala}} contains an implicit def timeWindowedSerde as below:
>  
> {code:java}
> implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new 
> WindowedSerdes.TimeWindowedSerde[T]()
> {code}
> It creates a new {{TimeWindowedSerde}} without inner serializer, which is a 
> bug. Even in {{WindowedSerdes.java}} it says that empty constructor is for 
> reflection.
> {code:java}
> // Default constructor needed for reflection object creation
> public TimeWindowedSerde() {
> super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>());
> }
> public TimeWindowedSerde(final Serde inner) {
>  super(new TimeWindowedSerializer<>(inner.serializer()), new 
> TimeWindowedDeserializer<>(inner.deserializer()));
> }
> {code}
> All above failes for me when I try to implicitly access the right Serde:
> {code:java}
> private val twSerde = implicitly[TimeWindowedSerde[String]]
> {code}
> and I have to create the object properly on my own
> {code}
>   private val twSerde = new 
> WindowedSerdes.TimeWindowedSerde[String](implicitly[Serde[String]])
> {code}
> it could be fixed with a proper call in {{Serdes.scala}}
> {code}
>   implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
> WindowedSerdes.TimeWindowedSerde[T] =
> new WindowedSerdes.TimeWindowedSerde[T](tSerde)
> {code}
> But maybe also the scope of the default constructor for {{TimeWindowedSerde}} 
> should be changed?
> BR, Michał



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931662#comment-16931662
 ] 

Gunnar Morling edited comment on KAFKA-8523 at 9/17/19 5:08 PM:


Yes, I was arriving at pretty much the same conclusion; that said: is an option 
even needed: I've come to think that a tombstone should remain just that, a 
tombstone, i.e. I don't see a good reason to inject any value into a tombstone 
really; after all, it has specific semantics -- enabling compaction -- which 
shouldn't be altered. So I would suggest I simply update my PR so that it 
passes on tombstones unmodified and that should be good enought. WDYT?

Note, if we wanted to have an option, I think it would allow to pass on 
tombstones unmodified (as suggested above) OR to insert the new field, i.e. 
there wouldn't be an empty map really returned, but a map with a single entry 
for that new field (so what the current PR is doing). This could be done, but 
as argued it'd change the message's nature of being a tombstone, so it's 
probably not desirable?


was (Author: gunnar.morling):
Yes, I was arriving at pretty much the same conclusion; that said: is an option 
even needed: I've come to think that a tombstone should remain just that, a 
tombstone, i.e. I don't see a good reason to inject any value into a tombstone 
really; after all, it has specific semantics -- enabling compaction -- which 
shouldn't be altered. So I would suggest I simply update my PR so that it 
passes on tombstones unmodified and that should be good enought. WDYT?

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB

2019-09-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931665#comment-16931665
 ] 

Matthias J. Sax commented on KAFKA-8897:


{quote}Seems like the lack of deprecation if anything means we can only do it 
in a major release. But if we do want to do it in a minor release, I think we 
should at least just log a warning in 2.4 if that method is called and delay 
the upgrade until 2.5.
{quote}
I don't see how we can do this?

While I agree with the incentive to not break compatibility, I don't see a way 
how *_we_* could deprecate anything? We don't control `CompactionOptionsFIFO` 
and if user code creates this class, how could we possible intercept the call 
to `setTtl()` and log a warning? We only control `Options` that we pass as 
method parameter.

> Increase Version of RocksDB
> ---
>
> Key: KAFKA-8897
> URL: https://issues.apache.org/jira/browse/KAFKA-8897
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> A higher version (6+) of RocksDB is needed for some metrics specified in 
> KIP-471. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931662#comment-16931662
 ] 

Gunnar Morling commented on KAFKA-8523:
---

Yes, I was arriving at pretty much the same conclusion; that said: is an option 
even needed: I've come to think that a tombstone should remain just that, a 
tombstone, i.e. I don't see a good reason to inject any value into a tombstone 
really; after all, it has specific semantics -- enabling compaction -- which 
shouldn't be altered. So I would suggest I simply update my PR so that it 
passes on tombstones unmodified and that should be good enought. WDYT?

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8839) Improve logging in Kafka Streams around debugging task lifecycle

2019-09-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931649#comment-16931649
 ] 

ASF GitHub Bot commented on KAFKA-8839:
---

guozhangwang commented on pull request #7258: KAFKA-8839 : Improve streams 
debug logging
URL: https://github.com/apache/kafka/pull/7258
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve logging in Kafka Streams around debugging task lifecycle 
> -
>
> Key: KAFKA-8839
> URL: https://issues.apache.org/jira/browse/KAFKA-8839
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>
> As a follow up to KAFKA-8831, this Jira will track efforts around improving 
> logging/docs around 
>  
>  * Being able to follow state of tasks from assignment to restoration 
>  * Better detection of misconfigured state store dir 
>  * Docs giving guidance for rebalance time and state store config



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931646#comment-16931646
 ] 

Randall Hauch commented on KAFKA-8523:
--

It seems strange to me that InsertField into a null (tombstone) record would 
create an empty map / struct. Rather, it seems like we need a configuration 
property that defines the behavior on null.

However, that means we need a small KIP to define this property. I'd suggest a 
property like `behavior.on.null` whose values are either `empty` or `skip`. If 
we consider records with null values as tombstones (meaning they represent 
deleted records), defaulting to `skip` makes the most sense to me.

We do have a similar problem with `ExtractField` (see KAFKA-7052), and it'd be 
great if the KIP added that configuration item there as well.

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8917) When performing a Left/Right-Join, pick the headers of the same side

2019-09-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-8917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pascal Büttiker updated KAFKA-8917:
---
Description: 
As described in KAFKA-7718, headers are promoted by the "triggering" record in 
stateful operations such as Joins. This was very confusing and we spent quite 
some time debugging this.

While we ideally have full control over this behaviour, like the KAFKA-7718 
proposes, I hope we can solve some of the randomness before this:
 * Inner-Join: Keep as is (use the headers of the triggering record)
 * Full-Join: Keep as is (use the headers of the triggering record)
 * Left-Join: *Always pick the headers of the left record.*
 * Right-Join: *Always pick the headers of the right record.*

This behaviour would solve the most pressing issues when dealing with headers 
in Kafka Streams.

*Motivation*:

In a CDC scenario, we usually have to resolve the relational database joins on 
our side, which usually means we enrich one record from a couple of other 
topics. So for a typical CDC use-case, Left-Joins allow the most basic 
de-normalisations from relational data models. Therefore, when we can solve the 
header behaviour for left/right joins, we can actually use Kafka Streams in a 
CDC scenario with joins and headers.

We depend on headers, especially when dealing with tombstone records. There is 
no other way to store additional information. If we do not use tombstone 
records, all default Kafka Features around compacted topics and KTabels are no 
longer useable. We are able to use custom Transformers to generate the headers 
(basically patch in the missing header support in Kafka Streams), but as soon 
that we use Join/Aggregate we loose control over the headers.

 

 

 

 

 

  was:
As described in KAFKA-7718, headers are promoted by the "triggering" record in 
stateful operations such as Joins. This was very confusing and we spent quite 
some time debugging this.

While we ideally have full control over this behaviour as like the KAFKA-7718 
proposes, I hope we can solve some of the randomness before this:
 * Inner-Join: Keep as is (use the headers of the triggering record)
 * Full-Join: Keep as is (use the headers of the triggering record)
 * Left-Join: *Always pick the headers of the left record.*
 * Right-Join: *Always pick the headers of the right record.*

This behaviour would solve the most pressing issues when dealing with headers 
in Kafka Streams.

*Motivation*:

In a CDC scenario, we usually have to resolve the relational database joins on 
our side, which usually means we enrich one record from a couple of other 
topics. So for a typical CDC use-case, Left-Joins allow the most basic 
de-normalisations from relational data models. Therefore, when we can solve the 
header behaviour for left/right joins, we can actually use Kafka Streams in a 
CDC scenario with joins and headers.

We depend on headers, especially when dealing with tombstone records. There is 
no other way to store additional information. If we do not use tombstone 
records, all default Kafka Features around compacted topics and KTabels are no 
longer useable. We are able to use custom Transformers to generate the headers 
(basically patch in the missing header support in Kafka Streams), but as soon 
that we use Join/Aggregate we loose control over the headers.

 

 

 

 

 


> When performing a Left/Right-Join, pick the headers of the same side
> 
>
> Key: KAFKA-8917
> URL: https://issues.apache.org/jira/browse/KAFKA-8917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Pascal Büttiker
>Priority: Major
>
> As described in KAFKA-7718, headers are promoted by the "triggering" record 
> in stateful operations such as Joins. This was very confusing and we spent 
> quite some time debugging this.
> While we ideally have full control over this behaviour, like the KAFKA-7718 
> proposes, I hope we can solve some of the randomness before this:
>  * Inner-Join: Keep as is (use the headers of the triggering record)
>  * Full-Join: Keep as is (use the headers of the triggering record)
>  * Left-Join: *Always pick the headers of the left record.*
>  * Right-Join: *Always pick the headers of the right record.*
> This behaviour would solve the most pressing issues when dealing with headers 
> in Kafka Streams.
> *Motivation*:
> In a CDC scenario, we usually have to resolve the relational database joins 
> on our side, which usually means we enrich one record from a couple of other 
> topics. So for a typical CDC use-case, Left-Joins allow the most basic 
> de-normalisations from relational data models. Therefore, when we can solve 
> the header behaviour for left/right joins, we can actually use Kafka Streams 
> in a CDC scenario with joins and headers.

[jira] [Updated] (KAFKA-8917) When performing a Left/Right-Join, pick the headers of the same side

2019-09-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-8917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pascal Büttiker updated KAFKA-8917:
---
Description: 
As described in KAFKA-7718, headers are promoted by the "triggering" record in 
stateful operations such as Joins. This was very confusing and we spent quite 
some time debugging this.

While we ideally have full control over this behaviour as like the KAFKA-7718 
proposes, I hope we can solve some of the randomness before this:
 * Inner-Join: Keep as is (use the headers of the triggering record)
 * Full-Join: Keep as is (use the headers of the triggering record)
 * Left-Join: *Always pick the headers of the left record.*
 * Right-Join: *Always pick the headers of the right record.*

This behaviour would solve the most pressing issues when dealing with headers 
in Kafka Streams.

*Motivation*:

In a CDC scenario, we usually have to resolve the relational database joins on 
our side, which usually means we enrich one record from a couple of other 
topics. So for a typical CDC use-case, Left-Joins allow the most basic 
de-normalisations from relational data models. Therefore, when we can solve the 
header behaviour for left/right joins, we can actually use Kafka Streams in a 
CDC scenario with joins and headers.

We depend on headers, especially when dealing with tombstone records. There is 
no other way to store additional information. If we do not use tombstone 
records, all default Kafka Features around compacted topics and KTabels are no 
longer useable. We are able to use custom Transformers to generate the headers 
(basically patch in the missing header support in Kafka Streams), but as soon 
that we use Join/Aggregate we loose control over the headers.

 

 

 

 

 

  was:
As described in KAFKA-7718, headers are promoted by the "triggering" record in 
stateful operations such as Joins. This was very confusing and we spent quite 
some time debugging this.

While we ideally have full control over this behaviour as like the KAFKA-7718 
proposes, I hope we can solve some of the randomness before this:
 * Inner-Join: Keep as is (use the headers of the triggering record)
 * Full-Join: Keep as is (use the headers of the triggering record)
 * Left-Join: *Always pick the headers of the left record.*
 * Right-Join: *Always pick the headers of the right record.*

This behaviour would solve the most pressing issues when dealing with headers 
in Kafka Streams.

*Motivation*:

We depend on headers, especially when dealing with tombstone records. There is 
no other way to store additional information. If we do not use tombstone 
records, all default Kafka Features around compacted topics and KTabels are no 
longer useable. We are able to use custom Transformers to generate the headers 
(basically patch in the missing header support in Kafka Streams), but as soon 
that we use Join/Aggregate we loose control over the headers.

 

 

 

 

 


> When performing a Left/Right-Join, pick the headers of the same side
> 
>
> Key: KAFKA-8917
> URL: https://issues.apache.org/jira/browse/KAFKA-8917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Pascal Büttiker
>Priority: Major
>
> As described in KAFKA-7718, headers are promoted by the "triggering" record 
> in stateful operations such as Joins. This was very confusing and we spent 
> quite some time debugging this.
> While we ideally have full control over this behaviour as like the KAFKA-7718 
> proposes, I hope we can solve some of the randomness before this:
>  * Inner-Join: Keep as is (use the headers of the triggering record)
>  * Full-Join: Keep as is (use the headers of the triggering record)
>  * Left-Join: *Always pick the headers of the left record.*
>  * Right-Join: *Always pick the headers of the right record.*
> This behaviour would solve the most pressing issues when dealing with headers 
> in Kafka Streams.
> *Motivation*:
> In a CDC scenario, we usually have to resolve the relational database joins 
> on our side, which usually means we enrich one record from a couple of other 
> topics. So for a typical CDC use-case, Left-Joins allow the most basic 
> de-normalisations from relational data models. Therefore, when we can solve 
> the header behaviour for left/right joins, we can actually use Kafka Streams 
> in a CDC scenario with joins and headers.
> We depend on headers, especially when dealing with tombstone records. There 
> is no other way to store additional information. If we do not use tombstone 
> records, all default Kafka Features around compacted topics and KTabels are 
> no longer useable. We are able to use custom Transformers to generate the 
> headers (basically patch in the missing header support in Kafka Streams), but 
> as soon that we use 

[jira] [Created] (KAFKA-8917) When performing a Left/Right-Join, pick the headers of the same side

2019-09-17 Thread Jira
Pascal Büttiker created KAFKA-8917:
--

 Summary: When performing a Left/Right-Join, pick the headers of 
the same side
 Key: KAFKA-8917
 URL: https://issues.apache.org/jira/browse/KAFKA-8917
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Pascal Büttiker


As described in KAFKA-7718, headers are promoted by the "triggering" record in 
stateful operations such as Joins. This was very confusing and we spent quite 
some time debugging this.

While we ideally have full control over this behaviour as like the KAFKA-7718 
proposes, I hope we can solve some of the randomness before this:
 * Inner-Join: Keep as is (use the headers of the triggering record)
 * Full-Join: Keep as is (use the headers of the triggering record)
 * Left-Join: *Always pick the headers of the left record.*
 * Right-Join: *Always pick the headers of the right record.*

This behaviour would solve the most pressing issues when dealing with headers 
in Kafka Streams.

*Motivation*:

We depend on headers, especially when dealing with tombstone records. There is 
no other way to store additional information. If we do not use tombstone 
records, all default Kafka Features around compacted topics and KTabels are no 
longer useable. We are able to use custom Transformers to generate the headers 
(basically patch in the missing header support in Kafka Streams), but as soon 
that we use Join/Aggregate we loose control over the headers.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8911) Implicit TimeWindowedSerde creates Serde with null inner serializer

2019-09-17 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-8911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931614#comment-16931614
 ] 

Michał commented on KAFKA-8911:
---

The test failure seems a bit random to me but I can not restart it

> Implicit TimeWindowedSerde creates Serde with null inner serializer
> ---
>
> Key: KAFKA-8911
> URL: https://issues.apache.org/jira/browse/KAFKA-8911
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>
> {{Serdes.scala}} contains an implicit def timeWindowedSerde as below:
>  
> {code:java}
> implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new 
> WindowedSerdes.TimeWindowedSerde[T]()
> {code}
> It creates a new {{TimeWindowedSerde}} without inner serializer, which is a 
> bug. Even in {{WindowedSerdes.java}} it says that empty constructor is for 
> reflection.
> {code:java}
> // Default constructor needed for reflection object creation
> public TimeWindowedSerde() {
> super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>());
> }
> public TimeWindowedSerde(final Serde inner) {
>  super(new TimeWindowedSerializer<>(inner.serializer()), new 
> TimeWindowedDeserializer<>(inner.deserializer()));
> }
> {code}
> All above failes for me when I try to implicitly access the right Serde:
> {code:java}
> private val twSerde = implicitly[TimeWindowedSerde[String]]
> {code}
> and I have to create the object properly on my own
> {code}
>   private val twSerde = new 
> WindowedSerdes.TimeWindowedSerde[String](implicitly[Serde[String]])
> {code}
> it could be fixed with a proper call in {{Serdes.scala}}
> {code}
>   implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
> WindowedSerdes.TimeWindowedSerde[T] =
> new WindowedSerdes.TimeWindowedSerde[T](tSerde)
> {code}
> But maybe also the scope of the default constructor for {{TimeWindowedSerde}} 
> should be changed?
> BR, Michał



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-09-17 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931597#comment-16931597
 ] 

Ryanne Dolan commented on KAFKA-7500:
-

[~chridtian.hagel] we try to be smart about what config properties are 
replicated and which are left as defaults:

- we have a config property blacklist: 
https://github.com/apache/kafka/pull/6295/files#diff-1ae39c06c52ded296030121b13d4b791R33
- we don't replicate a config property that is inherited from the cluster 
default or from the static broker config, i.e. we only replicate properties 
that are explicitly set for a topic.
- we don't replicate read-only or sensitive properties for obvious reasons.

cleanup.policy is one that _should_ be replicated, generally, unless you are 
expecting the default value to be replicated. Also, be advised that config sync 
is only periodic, with a default interval of 10 minutes.

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Manikumar
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8916) Unreliable kafka-reassign-partitions.sh affecting performance

2019-09-17 Thread VinayKumar (Jira)
VinayKumar created KAFKA-8916:
-

 Summary: Unreliable kafka-reassign-partitions.sh affecting 
performance
 Key: KAFKA-8916
 URL: https://issues.apache.org/jira/browse/KAFKA-8916
 Project: Kafka
  Issue Type: Task
  Components: admin, config
Affects Versions: 2.1.1
 Environment: CentOS 7
Reporter: VinayKumar


Currently I have 3 node kafka cluster, and I want to add 2 more nodes to make 
it 5 node cluster.
 *After adding the nodes to cluster, I need all the topic partitions to be 
evenly distributed across all the 5 nodes.
 **In the past, when I ran kafka-reassign-partitions.sh & 
kafka-preferred-replica-election.sh, it ran for very long time, hung & made the 
cluster unstable. So I'm afraid to use this method.


Can you please suggest the best & foolproof way to assign partitions among all 
the cluster nodes.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8471) Replace control requests/responses with automated protocol

2019-09-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931533#comment-16931533
 ] 

ASF GitHub Bot commented on KAFKA-8471:
---

ijuma commented on pull request #7353: KAFKA-8471: Replace control 
requests/responses with automated protocol
URL: https://github.com/apache/kafka/pull/7353
 
 
   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace control requests/responses with automated protocol
> --
>
> Key: KAFKA-8471
> URL: https://issues.apache.org/jira/browse/KAFKA-8471
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8577) Flaky Test `DistributedHerderTest.testJoinLeaderCatchUpFails`

2019-09-17 Thread Warren Marivel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931525#comment-16931525
 ] 

Warren Marivel commented on KAFKA-8577:
---

Hi,

 

I'm looking at contributing to Kafka and noticed that this test was fixed while 
searching for tickets to tackle.

[https://github.com/apache/kafka/commit/11b25a13ee72e1c91facd0f94b888215f67d5a69]

Should this Jira be marked as resolved?

 

Thanks,

Warren

 

> Flaky Test `DistributedHerderTest.testJoinLeaderCatchUpFails`
> -
>
> Key: KAFKA-8577
> URL: https://issues.apache.org/jira/browse/KAFKA-8577
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> Started seeing this regularly:
> {code:java}
> java.lang.AssertionError: 
>   Unexpected method call WorkerGroupMember.maybeLeaveGroup("taking too long 
> to read the log"):
> WorkerGroupMember.ensureActive(): expected: 2, actual: 1
> WorkerGroupMember.wakeup(): expected: 2, actual: 1
> WorkerGroupMember.maybeLeaveGroup("test join leader catch up fails"): 
> expected: 1, actual: 0
> WorkerGroupMember.requestRejoin(): expected: 1, actual: 0
> WorkerGroupMember.poll(): expected: 1, actual: 0{code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-09-17 Thread Christian Hagel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931473#comment-16931473
 ] 

Christian Hagel edited comment on KAFKA-7500 at 9/17/19 1:50 PM:
-

[~ryannedolan] increasing the number of tasks and playing around with the 
producer.buffer.memory variable helped a lot to get rid of the above-mentioned 
error.

 

However, I think I stumbled over a bug with the topic config syncing. When 
starting the mm2 with the following config:
{code:java}
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "",
"replication.policy.separator": "",
"target.cluster.alias": "B",
"source.cluster.bootstrap.servers": "ip:9092",
"target.cluster.bootstrap.servers": "ip:9093",
"sync.topic.acls.enabled": "false",
"replication.factor": "1",
"internal.topic.replication.factor": "1",
"topics": ".*",
"enabled": "true",
"rename.topics": "false",
"replication.factor": 1,
"refresh.topics": "true",
"refresh.groups": "true",
"sync.topic.configs": "true"
  }
{code}
it indeed does create all topics from the source cluster in the target cluster, 
with the correct number of partitions.

Other config parameters like cleanup.policy remain in the cluster default. I 
thought at first insufficient acls might be the cause, but I replicated the 
behavior also with a simple docker-compose setup. Did I miss a configuration? 
Or is this even the intended behavior


was (Author: chridtian.hagel):
[~ryannedolan] increasing the number of tasks and playing around with the 
producer.buffer.memory variable helped a lot to get rid of the above-mentioned 
error.

 

However, I think I stumbled over a bug with the topic config syncing. When 
starting the mm2 with the following config:
{code:java}
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "",
"replication.policy.separator": "",
"target.cluster.alias": "B",
"source.cluster.bootstrap.servers": "ip:9092",
"target.cluster.bootstrap.servers": "ip:9093",
"sync.topic.acls.enabled": "false",
"replication.factor": "1",
"internal.topic.replication.factor": "1",
"topics": ".*",
"enabled": "true",
"rename.topics": "false",
"replication.factor": 1,
"refresh.topics": "true",
"refresh.groups": "true",
"sync.topic.configs": "true"
  }
{code}
it indeed does create all topics from the source cluster in the target cluster, 
with the correct number of partitions.

Other config parameters like cleanup.policy remain in the cluster default. I 
thought at first insufficient acls might be the cause, but I replicated the 
behavior also with a simple docker-compose setup. Did I miss a configuration? 
Or is this even the intendet behavior

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Manikumar
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-09-17 Thread Christian Hagel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931473#comment-16931473
 ] 

Christian Hagel edited comment on KAFKA-7500 at 9/17/19 1:50 PM:
-

[~ryannedolan] increasing the number of tasks and playing around with the 
producer.buffer.memory variable helped a lot to get rid of the above-mentioned 
error.

 

However, I think I stumbled over a bug with the topic config syncing. When 
starting the mm2 with the following config:
{code:java}
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "",
"replication.policy.separator": "",
"target.cluster.alias": "B",
"source.cluster.bootstrap.servers": "ip:9092",
"target.cluster.bootstrap.servers": "ip:9093",
"sync.topic.acls.enabled": "false",
"replication.factor": "1",
"internal.topic.replication.factor": "1",
"topics": ".*",
"enabled": "true",
"rename.topics": "false",
"replication.factor": 1,
"refresh.topics": "true",
"refresh.groups": "true",
"sync.topic.configs": "true"
  }
{code}
it indeed does create all topics from the source cluster in the target cluster, 
with the correct number of partitions.

Other config parameters like cleanup.policy remain in the cluster default. I 
thought at first insufficient acls might be the cause, but I replicated the 
behavior also with a simple docker-compose setup. Did I miss a configuration? 
Or is this even the intendet behavior


was (Author: chridtian.hagel):
[~ryannedolan] increasing the number of tasks and playing around with the 
producer.buffer.memory variable helped a lot to get rid of the above-mentioned 
error.

 

However, I think I stumbled over a bug with the topic config syncing. When 
starting the mm2 with the following config:
{code:java}
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "",
"replication.policy.separator": "",
"target.cluster.alias": "B",
"source.cluster.bootstrap.servers": "ip:9092",
"target.cluster.bootstrap.servers": "ip:9093",
"sync.topic.acls.enabled": "false",
"replication.factor": "1",
"internal.topic.replication.factor": "1",
"topics": ".*",
"enabled": "true",
"rename.topics": "false",
"replication.factor": 1,
"refresh.topics": "true",
"refresh.groups": "true",
"sync.topic.configs": "true"
  }
{code}
it indeed does create all topics from the source cluster in the target cluster, 
with the correct number of partitions.

Other config parameters like cleanup.policy remain in the cluster default. I 
thought at first insufficient acls might be the cause, but I replicated the 
behavior also with a simple docker-compose setup. Did I miss a configuration? 

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Manikumar
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-09-17 Thread Christian Hagel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931473#comment-16931473
 ] 

Christian Hagel commented on KAFKA-7500:


[~ryannedolan] increasing the number of tasks and playing around with the 
producer.buffer.memory variable helped a lot to get rid of the above-mentioned 
error.

 

However, I think I stumbled over a bug with the topic config syncing. When 
starting the mm2 with the following config:
{code:java}
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "",
"replication.policy.separator": "",
"target.cluster.alias": "B",
"source.cluster.bootstrap.servers": "ip:9092",
"target.cluster.bootstrap.servers": "ip:9093",
"sync.topic.acls.enabled": "false",
"replication.factor": "1",
"internal.topic.replication.factor": "1",
"topics": ".*",
"enabled": "true",
"rename.topics": "false",
"replication.factor": 1,
"refresh.topics": "true",
"refresh.groups": "true",
"sync.topic.configs": "true"
  }
{code}
it indeed does create all topics from the source cluster in the target cluster, 
with the correct number of partitions.

Other config parameters like cleanup.policy remain in the cluster default. I 
thought at first insufficient acls might be the cause, but I replicated the 
behavior also with a simple docker-compose setup. Did I miss a configuration? 

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Manikumar
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8523) InsertField transformation fails when encountering tombstone event

2019-09-17 Thread Frederic Tardif (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931374#comment-16931374
 ] 

Frederic Tardif commented on KAFKA-8523:


I believe an option should be exposed to configure the behaviour in order to 
either pass the null value unmodified or to apply the insert on an empty map. 

> InsertField transformation fails when encountering tombstone event
> --
>
> Key: KAFKA-8523
> URL: https://issues.apache.org/jira/browse/KAFKA-8523
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> When applying the {{InsertField}} transformation to a tombstone event, an 
> exception is raised:
> {code}
> org.apache.kafka.connect.errors.DataException: Only Map objects supported in 
> absence of schema for [field insertion], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.InsertField.applySchemaless(InsertField.java:138)
>   at 
> org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:131)
>   at 
> org.apache.kafka.connect.transforms.InsertFieldTest.tombstone(InsertFieldTest.java:128)
> {code}
> AFAICS, the transform can still be made working in in this case by simply 
> building up a new value map from scratch.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8911) Implicit TimeWindowedSerde creates Serde with null inner serializer

2019-09-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931314#comment-16931314
 ] 

ASF GitHub Bot commented on KAFKA-8911:
---

atais commented on pull request #7352: KAFKA-8911: Using proper WindowSerdes 
constructors in their implicit definitions
URL: https://github.com/apache/kafka/pull/7352
 
 
   Detailed info is available in the ticket: 
https://issues.apache.org/jira/browse/KAFKA-8911
   
   Briefly, `implicit defs` are calling empty constructors, which exists only 
for reflection object creation.
   Therefore, while using the implicit definitons, a NPE occurs when Serde is 
called.
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implicit TimeWindowedSerde creates Serde with null inner serializer
> ---
>
> Key: KAFKA-8911
> URL: https://issues.apache.org/jira/browse/KAFKA-8911
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>
> {{Serdes.scala}} contains an implicit def timeWindowedSerde as below:
>  
> {code:java}
> implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new 
> WindowedSerdes.TimeWindowedSerde[T]()
> {code}
> It creates a new {{TimeWindowedSerde}} without inner serializer, which is a 
> bug. Even in {{WindowedSerdes.java}} it says that empty constructor is for 
> reflection.
> {code:java}
> // Default constructor needed for reflection object creation
> public TimeWindowedSerde() {
> super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>());
> }
> public TimeWindowedSerde(final Serde inner) {
>  super(new TimeWindowedSerializer<>(inner.serializer()), new 
> TimeWindowedDeserializer<>(inner.deserializer()));
> }
> {code}
> All above failes for me when I try to implicitly access the right Serde:
> {code:java}
> private val twSerde = implicitly[TimeWindowedSerde[String]]
> {code}
> and I have to create the object properly on my own
> {code}
>   private val twSerde = new 
> WindowedSerdes.TimeWindowedSerde[String](implicitly[Serde[String]])
> {code}
> it could be fixed with a proper call in {{Serdes.scala}}
> {code}
>   implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
> WindowedSerdes.TimeWindowedSerde[T] =
> new WindowedSerdes.TimeWindowedSerde[T](tSerde)
> {code}
> But maybe also the scope of the default constructor for {{TimeWindowedSerde}} 
> should be changed?
> BR, Michał



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8915) 无法修改partition

2019-09-17 Thread lingyi.zhong (Jira)
lingyi.zhong created KAFKA-8915:
---

 Summary: 无法修改partition
 Key: KAFKA-8915
 URL: https://issues.apache.org/jira/browse/KAFKA-8915
 Project: Kafka
  Issue Type: Bug
Reporter: lingyi.zhong


[root@work1 kafka]# bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 
--replication-factor 1 --partitions 1  --topic test_topic3[root@work1 kafka]# 
bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 --replication-factor 
1 --partitions 1  --topic test_topic3

WARNING: Due to limitations in metric names, topics with a period ('.') or 
underscore ('_') could collide. To avoid issues it is best to use either, but 
not both.Created topic "test_topic3".[root@work1 kafka]# bin/kafka-topics.sh  
--alter --zookeeper 10.20.30.78:2181/chroot  --partition 2 --topic test_topic3

Exception in thread "main" joptsimple.UnrecognizedOptionException: partition is 
not a recognized option at 
joptsimple.OptionException.unrecognizedOption(OptionException.java:108) at 
joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510) at 
joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56) at 
joptsimple.OptionParser.parse(OptionParser.java:396) at 
kafka.admin.TopicCommand$TopicCommandOptions.(TopicCommand.scala:358) at 
kafka.admin.TopicCommand$.main(TopicCommand.scala:44) at 
kafka.admin.TopicCommand.main(TopicCommand.scala)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8910) Incorrect javadoc at KafkaProducer.InterceptorCallback#onCompletion

2019-09-17 Thread Sergey Ushakov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Ushakov updated KAFKA-8910:
--
Description: 
h1. Problem

Javadoc for org.apache.kafka.clients.producer.Callback states:
{noformat}
* @param metadata The metadata for the record that was sent (i.e. the partition 
and offset). Null if an error
*occurred. {noformat}
In fact, metadata is never null since KAFKA-6180. See 
org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
 for details.

  was:
h1. Problem

Javadoc for 
org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
 states:
{noformat}
* @param metadata The metadata for the record that was sent (i.e. the partition 
and offset). Null if an error
*occurred. {noformat}
In fact, metadata is never null since KAFKA-6180. See 
org.apache.kafka.clients.producer.Callback for details.


> Incorrect javadoc at KafkaProducer.InterceptorCallback#onCompletion
> ---
>
> Key: KAFKA-8910
> URL: https://issues.apache.org/jira/browse/KAFKA-8910
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sergey Ushakov
>Assignee: Lee Dongjin
>Priority: Major
>
> h1. Problem
> Javadoc for org.apache.kafka.clients.producer.Callback states:
> {noformat}
> * @param metadata The metadata for the record that was sent (i.e. the 
> partition and offset). Null if an error
> *occurred. {noformat}
> In fact, metadata is never null since KAFKA-6180. See 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
>  for details.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8584) Allow "bytes" type to generated a ByteBuffer rather than byte arrays

2019-09-17 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931154#comment-16931154
 ] 

Tom Bentley commented on KAFKA-8584:


I for one would like to see this discussed in a KIP, even if it doesn't change 
the serialized form and therefore isn't really a public API in the strictest 
sense.

> Allow "bytes" type to generated a ByteBuffer rather than byte arrays
> 
>
> Key: KAFKA-8584
> URL: https://issues.apache.org/jira/browse/KAFKA-8584
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: newbie
>
> Right now in the RPC definition, type {{bytes}} would be translated into 
> {{byte[]}} in generated Java code. However, for some requests like 
> ProduceRequest#partitionData, the underlying type would better be a 
> ByteBuffer rather than a byte array.
> One proposal is to add an additional boolean tag {{useByteBuffer}} for 
> {{bytes}} type, which by default is false; when set to {{true}} set the 
> corresponding field to generate {{ByteBuffer}} instead of {{[]byte}}. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8914) Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error

2019-09-17 Thread Igamr Palsenberg (Jira)
Igamr Palsenberg created KAFKA-8914:
---

 Summary: Setting GROUP_INSTANCE_ID_CONFIG leads to a protobuf error
 Key: KAFKA-8914
 URL: https://issues.apache.org/jira/browse/KAFKA-8914
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.3.0
Reporter: Igamr Palsenberg


 

Setting ConsumerConfig.GROUP_INSTANCE_ID_CONFIG (group.instance.id) to any 
value (I tried valid numbers and just some string values) will result in : 

org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'version': 
java.nio.BufferUnderflowExceptiong.apache.kafka.common.protocol.types.SchemaException:
 Error reading field 'version': java.nio.BufferUnderflowException at 
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) at 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)

 

Kafka broker : 

Kafka 2.3.0, installed using Brew. Default config, except the bind IP, that is 
set explicitly to localhost.

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)