[GitHub] [kafka] mjsax commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-19 Thread GitBox


mjsax commented on pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#issuecomment-675900333


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-19 Thread GitBox


cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675909416


   If we assume, that the same thread adds and removes the state store level 
metrics (which is currently the case), we just need thread-safe maps 
`storeLevelSensors` and `storeLevelMetrics` and forgo `synchronize`. This can 
be done by using `ConcurrentMap` instead of `Map`. However, with the current 
API, we do not make that assumption, because we pass in the thread ID. I would 
be really happy to get rid of `synchronized` and make the changes to the API 
for state store level metrics. Afterwards, we should consider adapting also the 
other levels.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna edited a comment on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-19 Thread GitBox


cadonna edited a comment on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675909416


   If we assume, that the same thread adds and removes the state store level 
metrics (which is currently the case), we just need thread-safe maps 
`storeLevelSensors` and `storeLevelMetrics` and forgo `synchronize`. This can 
be done by using `ConcurrentMap` instead of `Map`. However, with the current 
API, we do not make that assumption, because we pass in the thread ID. I would 
be really happy to get rid of `synchronized` and make the changes to the API 
for state store level metrics. Afterwards, we should consider adapting also the 
other levels. WDYT?  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10363) Broker try to connect to a new cluster when there are changes in zookeeper.connect properties

2020-08-19 Thread Alexey Kornev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180346#comment-17180346
 ] 

Alexey Kornev commented on KAFKA-10363:
---

[~RensGroothuijsen] Unfortunately, this is not an option: even if we deleted 
meta.properties, the broker tries to connect to a new cluster, not to an old 
one.

> Broker try to connect to a new cluster when there are changes in 
> zookeeper.connect properties
> -
>
> Key: KAFKA-10363
> URL: https://issues.apache.org/jira/browse/KAFKA-10363
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.3.1
> Environment: 3 Kafka brokers (v2.3.1, v2.4.0) with Zookeeper cluster 
> (3.4.10)
> Ubuntu 18.04 LTS
>Reporter: Alexey Kornev
>Priority: Critical
>
> We've just successfully set up a Kafka cluster consists of 3 brokers and 
> faced with the following issue: when we change order of zookeeper servers in 
> zookeeper.connect property in server.properties files and restart Kafka 
> broker then this Kafka broker tries to connect to a new Kafka cluster. As a 
> result, Kafka broker throws an error and shutdown. 
> For example, config server.properties on first broker:
> {code:java}
> broker.id=-1
> ...
> zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka
> {code}
>  We changed it to 
> {code:java}
> broker.id=-1
> ...
> zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka {code}
> and restart Kafka broker. 
> Logs:
> {code:java}
> [2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean 
> (kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO 
> Registered signal handlers for TERM, INT, HUP 
> (org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] 
> INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO 
> Connecting to zookeeper on 
> node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
> (kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient 
> Kafka server] Initializing a new session to node_2:2181. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client 
> environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf,
>  built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,693] INFO Client environment:host.name=localhost 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client 
> environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,696] INFO Client environment:java.vendor=Ubuntu 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
> environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
> environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/../libs/jakarta.activatio

[jira] [Created] (KAFKA-10420) group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation when running kafka client 2.6 against broker 2.3.1

2020-08-19 Thread Tomasz Kaszuba (Jira)
Tomasz Kaszuba created KAFKA-10420:
--

 Summary: group instance id Optional.empty failed due to 
UNKNOWN_MEMBER_ID, resetting generation when running kafka client 2.6 against 
broker 2.3.1
 Key: KAFKA-10420
 URL: https://issues.apache.org/jira/browse/KAFKA-10420
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.6.0
Reporter: Tomasz Kaszuba


After upgrading our kafka clients to 2.6.0 and running it against broker 
version 2.3.1 we got errors where the consumer groups are reset. We didn't 
notice this happening with client 2.5.0.
{noformat}
020-08-17 04:35:27.787  INFO 1 --- [-StreamThread-1] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1-consumer,
 groupId=ieb-x07-baseline-pc-data-storage-incurred-pattern] Attempt to 
heartbeat with Generation{generationId=11, 
memberId='ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1-consumer-3902e2a9-1755-466b-9255-d144be25876f',
 protocol='stream'} and group instance id Optional.empty failed due to 
UNKNOWN_MEMBER_ID, resetting generation2020-08-17 04:35:27.787  INFO 1 --- 
[-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1-consumer,
 groupId=ieb-x07-baseline-pc-data-storage-incurred-pattern] Giving away all 
assigned partitions as lost since generation has been reset,indicating that 
consumer is no longer part of the group2020-08-17 04:35:27.787  INFO 1 --- 
[-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1-consumer,
 groupId=ieb-x07-baseline-pc-data-storage-incurred-pattern] Lost previously 
assigned partitions ieb.publish.baseline_pc.incurred_pattern-02020-08-17 
04:35:27.787  INFO 1 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread 
    : stream-thread 
[ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1]
 at state RUNNING: partitions [ieb.publish.baseline_pc.incurred_pattern-0] lost 
due to missed rebalance.    lost active tasks: [0_0]    lost assigned 
standby tasks: []2020-08-17 04:35:27.787  INFO 1 --- [-StreamThread-1] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1]
 task [0_0] Suspended running 2020-08-17 04:35:27.788  INFO 1 --- 
[-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer 
clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1-restore-consumer,
 groupId=null] Unsubscribed all topics or patterns and assigned partitions 
2020-08-17 04:35:27.789  INFO 1 --- [-StreamThread-1] 
o.a.k.s.p.internals.RecordCollectorImpl  : stream-thread 
[ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1]
 task [0_0] Closing record collector dirty 2020-08-17 04:35:27.790  INFO 1 --- 
[-StreamThread-1] o.a.k.s.processor.internals.StreamTask   : stream-thread 
[ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1]
 task [0_0] Closed dirty 2020-08-17 04:35:27.790  INFO 1 --- [-StreamThread-1] 
o.a.k.clients.producer.KafkaProducer : [Producer 
clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1-0_0-producer,
 transactionalId=ieb-x07-baseline-pc-data-storage-incurred-pattern-0_0] Closing 
the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2020-08-17 
04:35:27.791  INFO 1 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread 
    : stream-thread 
[ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1]
 partitions lost took 4 ms. 2020-08-17 04:35:27.791  INFO 1 --- 
[-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1-consumer,
 groupId=ieb-x07-baseline-pc-data-storage-incurred-pattern] (Re-)joining group 
2020-08-17 04:35:27.795  INFO 1 --- [-StreamThread-1] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-36fbee26-0c5f-4993-a203-f34c0cac7caf-StreamThread-1-consumer,
 groupId=ieb-x07-baseline-pc-data-storage-incurred-pattern] Join group failed 
with org.apache.kafka.common.errors.MemberIdRequiredException: The group member 
needs to have a valid member id before actually entering a consumer group. 
2020-08-17 04:35:27.795  INFO 1 ---

[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-19 Thread GitBox


cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-676001391


   I removed `synchronized` during adding and removing store level sensors and 
metrics. Let me know what you think. If you do not like it, we can revert the 
last commit. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-19 Thread GitBox


rajinisivaram commented on a change in pull request #9200:
URL: https://github.com/apache/kafka/pull/9200#discussion_r472885774



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -185,10 +192,10 @@ public void close() {
 deleteAllTopics(backup.kafka());
 primary.stop();
 backup.stop();
+assertFalse(exited.get());
+Exit.resetExitProcedure();

Review comment:
   I am guessing we ran into an issue because of exit, so perhaps this 
reset ought to be in a finally block?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10421) Kafka Producer deadlocked on get call

2020-08-19 Thread Ranadeep Deb (Jira)
Ranadeep Deb created KAFKA-10421:


 Summary: Kafka Producer deadlocked on get call
 Key: KAFKA-10421
 URL: https://issues.apache.org/jira/browse/KAFKA-10421
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.3.0
 Environment: CentOS7
Reporter: Ranadeep Deb


I have been experiencing a similar issue in 2.3.0

I have a multi threaded application with each thread sending an individual 
message to the broker. There are instances where I have observed that the 
Producer threads get stuck on the Producer.send().get() call. I was not sure 
what was causing this issue but after landing on this thread 
(https://issues.apache.org/jira/browse/KAFKA-8135) I am suspecting that 
intermittent network outage might be the reason. 

I am curious about how to solve this.

 

Following are the stack trace of the Java threads

 

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed mode):Full 
thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed mode):
"Attach Listener" #15081 daemon prio=9 os_prio=0 tid=0x7f9c50002000 
nid=0xe572 waiting on condition [0x]   java.lang.Thread.State: 
RUNNABLE
"pool-14658-thread-9" #15071 prio=5 os_prio=0 tid=0x7f9c9842f800 nid=0x397b 
waiting on condition [0x7f9c378fb000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007703e85b8> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
 at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)
"pool-14658-thread-8" #15070 prio=5 os_prio=0 tid=0x7f9c9842e000 nid=0x397a 
waiting on condition [0x7f9c379fc000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007704dabb0> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
 at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)
"pool-14658-thread-7" #15069 prio=5 os_prio=0 tid=0x7f9c9842d800 nid=0x3979 
waiting on condition [0x7f9c371f4000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007705ed590> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurren

[jira] [Updated] (KAFKA-10421) Kafka Producer deadlocked on get call

2020-08-19 Thread Ranadeep Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ranadeep Deb updated KAFKA-10421:
-
Description: 
I have been experiencing a similar issue in 2.3.0

I have a multi threaded application with each thread sending an individual 
message to the broker. There are instances where I have observed that the 
Producer threads get stuck on the Producer.send().get() call. I was not sure 
what was causing this issue but after landing on this thread 
(https://issues.apache.org/jira/browse/KAFKA-8135) I am suspecting that 
intermittent network outage might be the reason. 

I am curious about how to solve this.

 

Following are the stack trace of the Java threads

 

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed mode):Full 
thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed mode):
 "Attach Listener" #15081 daemon prio=9 os_prio=0 tid=0x7f9c50002000 
nid=0xe572 waiting on condition [0x]   java.lang.Thread.State: 
RUNNABLE

"pool-14658-thread-9" #15071 prio=5 os_prio=0 tid=0x7f9c9842f800 nid=0x397b 
waiting on condition [0x7f9c378fb000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007703e85b8> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
 at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
ava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

 

"pool-14658-thread-8" #15070 prio=5 os_prio=0 tid=0x7f9c9842e000 nid=0x397a 
waiting on condition [0x7f9c379fc000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007704dabb0> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
 at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

 

"pool-14658-thread-7" #15069 prio=5 os_prio=0 tid=0x7f9c9842d800 nid=0x3979 
waiting on condition [0x7f9c371f4000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007705ed590> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
 at 
org.apache.kafka.clients.producer.

[GitHub] [kafka] carlos-verdes commented on pull request #9176: Allow replace all for RegexRouter

2020-08-19 Thread GitBox


carlos-verdes commented on pull request #9176:
URL: https://github.com/apache/kafka/pull/9176#issuecomment-676156385


   I will be happy to do but I may need help about the KIP.
   Do you need me to open a Jira ticket to move forward?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-19 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472947852



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   Ok, I think I follow now.  You are saying that we could potentially 
implement describe by invoking 1+N requests to Kafka: one to get the list of 
credentials (either the list of all of them if we are asking for them all, or 
the explicitly requested ones we wanted), and then another N requests to get 
the data for each one.  This on the surface seems like an anti-pattern, but it 
is not unreasonable for the case where the data is expensive to get in the 
first place — maybe we are forced to make 1 or more round-trips for each 
anyway.  So as a general, reusable pattern, yes, I believe it works.
   
   So when we invoke describe, whether it is describe-all or just an explicit 
few, we return a single future, and that future returns a list of instances 
(UserName in this case): either 1 instance for each user that has at least 1 
credential for the describe-all case, or one instance per distinct user 
explicitly requested otherwise.  Then each UserName instance has the accessor 
you mentioned, which in this case returns  
Future.
   
   Do I have that right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9142: MINOR: Fix delete_topic for system tests

2020-08-19 Thread GitBox


rajinisivaram commented on pull request #9142:
URL: https://github.com/apache/kafka/pull/9142#issuecomment-676279828


   @skaundinya15 Thanks for the PR, merging to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #9142: MINOR: Fix delete_topic for system tests

2020-08-19 Thread GitBox


rajinisivaram merged pull request #9142:
URL: https://github.com/apache/kafka/pull/9142


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10421) Kafka Producer deadlocked on get call

2020-08-19 Thread Ranadeep Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ranadeep Deb updated KAFKA-10421:
-
Description: 
I have been experiencing a similar issue in 2.3.0

I have a multi threaded application with each thread sending an individual 
message to the broker. There are instances where I have observed that the 
Producer threads get stuck on the Producer.send().get() call. I was not sure 
what was causing this issue but after landing on this thread 
(https://issues.apache.org/jira/browse/KAFKA-8135) I am suspecting that 
intermittent network outage might be the reason. 

I am curious about how to solve this.

 

Following are the stack trace of the Java threads

 

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed mode):Full 
thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed mode):
 "Attach Listener" #15081 daemon prio=9 os_prio=0 tid=0x7f9c50002000 
nid=0xe572 waiting on condition [0x]   java.lang.Thread.State: 
RUNNABLE

"pool-14658-thread-9" #15071 prio=5 os_prio=0 tid=0x7f9c9842f800 nid=0x397b 
waiting on condition [0x7f9c378fb000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007703e85b8> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
 at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
ava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

 

"pool-14658-thread-8" #15070 prio=5 os_prio=0 tid=0x7f9c9842e000 nid=0x397a 
waiting on condition [0x7f9c379fc000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007704dabb0> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
 at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

 

"pool-14658-thread-7" #15069 prio=5 os_prio=0 tid=0x7f9c9842d800 nid=0x3979 
waiting on condition [0x7f9c371f4000]   java.lang.Thread.State: WAITING 
(parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  
<0x0007705ed590> (a java.util.concurrent.CountDownLatch$Sync) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
 at 
org.apache.kafka.clients.producer.

[jira] [Updated] (KAFKA-10421) Kafka Producer deadlocked on get() call

2020-08-19 Thread Ranadeep Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ranadeep Deb updated KAFKA-10421:
-
Summary: Kafka Producer deadlocked on get() call  (was: Kafka Producer 
deadlocked on get call)

> Kafka Producer deadlocked on get() call
> ---
>
> Key: KAFKA-10421
> URL: https://issues.apache.org/jira/browse/KAFKA-10421
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: CentOS7
>Reporter: Ranadeep Deb
>Priority: Critical
>
> I have been experiencing a similar issue in 2.3.0
> I have a multi threaded application with each thread sending an individual 
> message to the broker. There are instances where I have observed that the 
> Producer threads get stuck on the Producer.send().get() call. I was not sure 
> what was causing this issue but after landing on this thread 
> (https://issues.apache.org/jira/browse/KAFKA-8135) I am suspecting that 
> intermittent network outage might be the reason. 
> I am curious about how to solve this.
>  
> Following are the stack trace of the Java threads
>  
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed 
> mode):Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed 
> mode):
>  "Attach Listener" #15081 daemon prio=9 os_prio=0 tid=0x7f9c50002000 
> nid=0xe572 waiting on condition [0x]   
> java.lang.Thread.State: RUNNABLE
> "pool-14658-thread-9" #15071 prio=5 os_prio=0 tid=0x7f9c9842f800 
> nid=0x397b waiting on condition [0x7f9c378fb000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007703e85b8> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>  at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
> at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> ava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> "pool-14658-thread-8" #15070 prio=5 os_prio=0 tid=0x7f9c9842e000 
> nid=0x397a waiting on condition [0x7f9c379fc000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007704dabb0> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>  at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
> at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> "pool-14658-thread-7" #15069 prio=5 os_prio=0 tid=0x7f9c9842d800 
> nid=0x3979 waiting on condition [0x7f9c371f4000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007705ed590> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurre

[jira] [Commented] (KAFKA-10421) Kafka Producer deadlocked on get() call

2020-08-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180563#comment-17180563
 ] 

Ismael Juma commented on KAFKA-10421:
-

Also, would you be able to upgrade to 2.5.1 and check if you still see the 
issue?

> Kafka Producer deadlocked on get() call
> ---
>
> Key: KAFKA-10421
> URL: https://issues.apache.org/jira/browse/KAFKA-10421
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: CentOS7
>Reporter: Ranadeep Deb
>Priority: Critical
>
> I have been experiencing a similar issue in 2.3.0
> I have a multi threaded application with each thread sending an individual 
> message to the broker. There are instances where I have observed that the 
> Producer threads get stuck on the Producer.send().get() call. I was not sure 
> what was causing this issue but after landing on this thread 
> (https://issues.apache.org/jira/browse/KAFKA-8135) I am suspecting that 
> intermittent network outage might be the reason. 
> I am curious about how to solve this.
>  
> Following are the stack trace of the Java threads
>  
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed 
> mode):Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed 
> mode):
>  "Attach Listener" #15081 daemon prio=9 os_prio=0 tid=0x7f9c50002000 
> nid=0xe572 waiting on condition [0x]   
> java.lang.Thread.State: RUNNABLE
> "pool-14658-thread-9" #15071 prio=5 os_prio=0 tid=0x7f9c9842f800 
> nid=0x397b waiting on condition [0x7f9c378fb000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007703e85b8> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>  at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
> at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> ava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> "pool-14658-thread-8" #15070 prio=5 os_prio=0 tid=0x7f9c9842e000 
> nid=0x397a waiting on condition [0x7f9c379fc000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007704dabb0> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>  at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
> at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> "pool-14658-thread-7" #15069 prio=5 os_prio=0 tid=0x7f9c9842d800 
> nid=0x3979 waiting on condition [0x7f9c371f4000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007705ed590> (a 
> java.util.concurrent.CountD

[jira] [Commented] (KAFKA-10421) Kafka Producer deadlocked on get() call

2020-08-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180562#comment-17180562
 ] 

Ismael Juma commented on KAFKA-10421:
-

What is the delivery.timeout.ms config?

> Kafka Producer deadlocked on get() call
> ---
>
> Key: KAFKA-10421
> URL: https://issues.apache.org/jira/browse/KAFKA-10421
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: CentOS7
>Reporter: Ranadeep Deb
>Priority: Critical
>
> I have been experiencing a similar issue in 2.3.0
> I have a multi threaded application with each thread sending an individual 
> message to the broker. There are instances where I have observed that the 
> Producer threads get stuck on the Producer.send().get() call. I was not sure 
> what was causing this issue but after landing on this thread 
> (https://issues.apache.org/jira/browse/KAFKA-8135) I am suspecting that 
> intermittent network outage might be the reason. 
> I am curious about how to solve this.
>  
> Following are the stack trace of the Java threads
>  
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed 
> mode):Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed 
> mode):
>  "Attach Listener" #15081 daemon prio=9 os_prio=0 tid=0x7f9c50002000 
> nid=0xe572 waiting on condition [0x]   
> java.lang.Thread.State: RUNNABLE
> "pool-14658-thread-9" #15071 prio=5 os_prio=0 tid=0x7f9c9842f800 
> nid=0x397b waiting on condition [0x7f9c378fb000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007703e85b8> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>  at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
> at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> ava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> "pool-14658-thread-8" #15070 prio=5 os_prio=0 tid=0x7f9c9842e000 
> nid=0x397a waiting on condition [0x7f9c379fc000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007704dabb0> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>  at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
> at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> "pool-14658-thread-7" #15069 prio=5 os_prio=0 tid=0x7f9c9842d800 
> nid=0x3979 waiting on condition [0x7f9c371f4000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007705ed590> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.l

[GitHub] [kafka] rhauch commented on pull request #9176: Allow replace all for RegexRouter

2020-08-19 Thread GitBox


rhauch commented on pull request #9176:
URL: https://github.com/apache/kafka/pull/9176#issuecomment-676385111


   Yes, please create an issue as described above. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10421) Kafka Producer deadlocked on get() call

2020-08-19 Thread Ranadeep Deb (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180572#comment-17180572
 ] 

Ranadeep Deb commented on KAFKA-10421:
--

I don't change the delivery.timeout.ms values. These are the config values I use

 

ACKS_CONFIG -> "0"
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1
RETRIES_CONFIG -> 10
REQUEST_TIMEOUT_MS_CONFIG -> 1_000
RETRY_BACKOFF_MS_CONFIG -> 3000

> Kafka Producer deadlocked on get() call
> ---
>
> Key: KAFKA-10421
> URL: https://issues.apache.org/jira/browse/KAFKA-10421
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: CentOS7
>Reporter: Ranadeep Deb
>Priority: Critical
>
> I have been experiencing a similar issue in 2.3.0
> I have a multi threaded application with each thread sending an individual 
> message to the broker. There are instances where I have observed that the 
> Producer threads get stuck on the Producer.send().get() call. I was not sure 
> what was causing this issue but after landing on this thread 
> (https://issues.apache.org/jira/browse/KAFKA-8135) I am suspecting that 
> intermittent network outage might be the reason. 
> I am curious about how to solve this.
>  
> Following are the stack trace of the Java threads
>  
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed 
> mode):Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.77-b03 mixed 
> mode):
>  "Attach Listener" #15081 daemon prio=9 os_prio=0 tid=0x7f9c50002000 
> nid=0xe572 waiting on condition [0x]   
> java.lang.Thread.State: RUNNABLE
> "pool-14658-thread-9" #15071 prio=5 os_prio=0 tid=0x7f9c9842f800 
> nid=0x397b waiting on condition [0x7f9c378fb000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007703e85b8> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>  at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
> at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> ava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> "pool-14658-thread-8" #15070 prio=5 os_prio=0 tid=0x7f9c9842e000 
> nid=0x397a waiting on condition [0x7f9c379fc000]   
> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native 
> Method) - parking to wait for  <0x0007704dabb0> (a 
> java.util.concurrent.CountDownLatch$Sync) at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>  at com.t100.sender.T100KafkaProducer.runProducer(T100KafkaProducer.java:104) 
> at com.t100.sender.T100KafkaProducer.run(T100KafkaProducer.java:165) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> "pool-14658-thread-7" #15069 prio=5 os_prio=0 tid=0x7f9c9842d800 
> nid=0x3979 waiting on condition [0x7f9c371f4000]   
> java.la

[GitHub] [kafka] ijuma commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-19 Thread GitBox


ijuma commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-676427721


   @cmccabe There's a checkstyle error, it seems.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

2020-08-19 Thread GitBox


vvcephei commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r473068515



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.rocksdb.RocksIterator;
-
-import java.util.Set;
-
-class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
   yes, it looks like it was. Thanks for cleaning it up!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-19 Thread GitBox


lbradstreet commented on a change in pull request #9200:
URL: https://github.com/apache/kafka/pull/9200#discussion_r473075940



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -185,10 +192,10 @@ public void close() {
 deleteAllTopics(backup.kafka());
 primary.stop();
 backup.stop();
+assertFalse(exited.get());
+Exit.resetExitProcedure();

Review comment:
   @rajinisivaram good point. I'll fix that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2020-08-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8940:

Fix Version/s: (was: 2.5.0)
   2.6.1
   2.5.2
   2.7.0

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.7.0, 2.5.2, 2.6.1
>
>
> I lost the screen shot unfortunately... it reports the set of expected 
> records does not match the received records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

2020-08-19 Thread GitBox


vvcephei commented on pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#issuecomment-676482308


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-19 Thread GitBox


lbradstreet commented on a change in pull request #9200:
URL: https://github.com/apache/kafka/pull/9200#discussion_r473110543



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -185,10 +192,10 @@ public void close() {
 deleteAllTopics(backup.kafka());
 primary.stop();
 backup.stop();
+assertFalse(exited.get());
+Exit.resetExitProcedure();

Review comment:
   I've pushed a fix. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #9199: KAFKA-10418: alter topic configs via kafka-topics error text

2020-08-19 Thread GitBox


rondagostino commented on pull request #9199:
URL: https://github.com/apache/kafka/pull/9199#issuecomment-676491706


   Note that KIP-377 landed in 2.2 and explicitly stated in the new code that 
altering topic configurations via `kafka-topics` was deprecated – even using 
the `--zookeeper` flag (setting topic configs using `kafka-topics` when using 
`--create` to create a topic is supported and works with --bootstrap-server; 
it’s just altering/deleting topic configs that was deprecated via `--zookeeper` 
and that never worked/never will work via `--bootstrap-server`).  Therefore, 
potentially, this could be backported as far back as 2.2.x.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #9199: KAFKA-10418: alter topic configs via kafka-topics error text

2020-08-19 Thread GitBox


rondagostino commented on pull request #9199:
URL: https://github.com/apache/kafka/pull/9199#issuecomment-676492765


   JDK 11 build failure was a n infrastructure issue.  JDK 8 test failure is an 
unrelated flaky test 
(`org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once]`).
  JDK 14 build was clean.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-19 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-10414:
-
Description: 
There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=

This is a transitive dependency through the apacheds libs.

-Can be fixed by upgrading to at least version 2.0.0.AM25-

Since api-all is also a dependency, and there is a class collision between 
api-all and newer version of api-util, it is better to just upgrade api-util to 
1.0.2

  was:
There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=

This is a transitive dependency through the apacheds libs. Can be fixed by 
upgrading to at least version 2.0.0.AM25


> Upgrade api-util dependency - CVE-2018-1337
> ---
>
> Key: KAFKA-10414
> URL: https://issues.apache.org/jira/browse/KAFKA-10414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
> involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=
> This is a transitive dependency through the apacheds libs.
> -Can be fixed by upgrading to at least version 2.0.0.AM25-
> Since api-all is also a dependency, and there is a class collision between 
> api-all and newer version of api-util, it is better to just upgrade api-util 
> to 1.0.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-19 Thread GitBox


cmccabe commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-676507610


   Another Jenkins problem.
   ```
   08:36:34  > git fetch --tags --progress -- git://github.com/apache/kafka.git 
+refs/heads/*:refs/remotes/origin/* +refs/pull/*:refs/remotes/origin/pr/*
   08:37:17 FATAL: java.io.IOException: Unexpected termination of the channel
   08:37:17 java.io.EOFException
   08:37:17 at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2681)
   08:37:17 at 
java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-19 Thread GitBox


cmccabe commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-676507773


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

2020-08-19 Thread GitBox


vvcephei commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r473071200



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
##
@@ -58,7 +62,7 @@ public synchronized boolean hasNext() {
 return allDone();
 } else {
 next = getKeyValue();
-iter.next();
+advanceIterator.accept(iter);

Review comment:
   clever!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-08-19 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r473158779



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
##
@@ -76,6 +77,16 @@ public AlterClientQuotasRequest build(short version) {
 public String toString() {
 return data.toString();
 }
+
+@Override
+public boolean equals(Object other) {

Review comment:
   Add equality check for the sake of easymock verification





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10419) KAFKA BROKER Shuts down when a topic is deleted manually from command line on Windows 1) operating System.

2020-08-19 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10419:
--
Component/s: (was: KafkaConnect)
 core

> KAFKA BROKER Shuts down when a topic is deleted manually from command line on 
> Windows 1) operating System.
> --
>
> Key: KAFKA-10419
> URL: https://issues.apache.org/jira/browse/KAFKA-10419
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
> Environment: WIndows 10 Operating System
>Reporter: Ajay Kapoor
>Priority: Major
>  Labels: Windows-10
>
> KAFKA VERSION: KAFKA_2.13-2.6.0
> Delete of topic on Windows causes kafka broker shutdown:
> [2020-08-18 15:18:22,858] INFO [ReplicaAlterLogDirsManager on broker 0] 
> Removed fetcher for partitions Set(quickstart-events-0) 
> (kafka.server.ReplicaAlterLogDirsManager)
> [2020-08-18 15:18:22,899] ERROR Error while renaming dir for 
> quickstart-events-0 in log dir C:\tmp\kafka-logs 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\quickstart-events-0 -> 
> C:\tmp\kafka-logs\quickstart-events-0.d767af7933ae4fe087c212994ef02e90-delete
>     at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89)
>     at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>     at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>     at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>     at java.base/java.nio.file.Files.move(Files.java:1425)
>     at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:913)
>     at kafka.log.Log.$anonfun$renameDir$2(Log.scala:981)
>     at kafka.log.Log.renameDir(Log.scala:2340)
>     at kafka.log.LogManager.asyncDelete(LogManager.scala:935)
>     at kafka.cluster.Partition.$anonfun$delete$1(Partition.scala:470)
>     at kafka.cluster.Partition.delete(Partition.scala:461)
>     at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:344)
>     at 
> kafka.server.ReplicaManager.$anonfun$stopReplicas$9(ReplicaManager.scala:448)
>     at scala.collection.mutable.HashMap$Node.foreach(HashMap.scala:587)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:475)
>     at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:445)
>     at 
> kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:252)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
>     at java.base/java.lang.Thread.run(Thread.java:830)
>     Suppressed: java.nio.file.AccessDeniedException: 
> C:\tmp\kafka-logs\quickstart-events-0 -> 
> C:\tmp\kafka-logs\quickstart-events-0.d767af7933ae4fe087c212994ef02e90-delete
>     at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89)
>     at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>     at 
> java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>     at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>     at java.base/java.nio.file.Files.move(Files.java:1425)
>     at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:910)
>     ... 14 more
> [2020-08-18 15:18:22,914] WARN [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\tmp\kafka-logs (kafka.server.ReplicaManager)
> [2020-08-18 15:18:22,930] WARN [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  and stopped moving logs for partitions  because they 
> are in the failed log directory C:\tmp\kafka-logs. 
> (kafka.server.ReplicaManager)
> [2020-08-18 15:18:22,932] WARN Stopping serving logs in dir C:\tmp\kafka-logs 
> (kafka.log.LogManager)
> [2020-08-18 15:18:22,946] ERROR Shutdown broker because all log dirs in 
> C:\tmp\kafka-logs 
>  have failed (kafka.log.LogManager)
> How to reproduce::
> 1. Start Zookeeper on windows
> >bin\windows\zookeeper-server-start.bat config\zookeeper.properties
> 2. Start Kafka Broker on Windows
> >bin\windows\kafka-server-start.bat config\server.properties
> 3. Create a Topic on Kafka Broker
> >bin\windows\kafka-topics.bat --create --topic quickstart-events 
> >--bootstrap-server localhost:9092
> 4. Delete the Kafka Topic created above.
> >bin\windows\kafka-topics.bat --delete --topic quickstart-events 
> >--bootstrap-server localhost:9092



--
This message was sent by Atlassian J

[GitHub] [kafka] gardnervickers opened a new pull request #9201: MINOR: Increase the amount of time available to the `test_verifiable_producer` test

2020-08-19 Thread GitBox


gardnervickers opened a new pull request #9201:
URL: https://github.com/apache/kafka/pull/9201


   Increase the amount of time available to the `test_verifiable_producer` test 
to login and get the process name for the verifiable producer from 5 seconds to 
10 seconds.
   
   We were seeing some test failures due to the assertion failing because the 
verifiable producer would complete before we could login, list the processes, 
and parse out the producer version. Previously, we were giving this operation 5 
seconds to run, this PR bumps it up to 10 seconds. 
   
   I verified locally that this does not flake, but even at 5 seconds I wasn't 
seeing any flakes. Ultimately we should find a better strategy than racing to 
query the producer process (as outlined in the existing comments). 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-19 Thread GitBox


cmccabe commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-676589017


   I verified that this removed the dependency by doing this:
   ```
   ./gradlew install -x check -x test
   find | grep jackson | grep jar$ | xargs -l rm
   ```
   And then testing the console consumer and console producer continued to work.
   
   (Since the broker does require jackson to run I was using a prebuilt kafka 
to run the clients against).
   
   Without this PR, the console consumer fails with this exception when the 
Jackson jars are not present:
   ```
   [2020-08-19 11:27:53,468] ERROR Error processing message, terminating 
consumer process:  (kafka.tools.ConsoleConsumer$)
   java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/JsonNode
   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendFindCoordinatorRequest(AbstractCoordinator.java:787)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-19 Thread GitBox


cmccabe commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-676589352


   The failed test is flaky.  Committing now.  Thanks for the review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-19 Thread GitBox


cmccabe merged pull request #9197:
URL: https://github.com/apache/kafka/pull/9197


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-19 Thread GitBox


ijuma commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-676594223


   Sounds great, thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9194: KAFKA-10384: Separate converters from generated messages

2020-08-19 Thread GitBox


cmccabe commented on pull request #9194:
URL: https://github.com/apache/kafka/pull/9194#issuecomment-676596370


   I tested that this works to allow console consumer to run without a jackson 
dependency.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-08-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180775#comment-17180775
 ] 

Guozhang Wang commented on KAFKA-10134:
---

I think I'd need more information to further investigate this issue. [~zhowei] 
could you apply https://github.com/apache/kafka/pull/9038 only (this is for 
improved log4j entries) on top of 2.6 to reproduce this issue and then upload 
the enhanced log file?

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-19 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r473276446



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));

Review comment:
   ok





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-19 Thread GitBox


abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-676628747


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-19 Thread GitBox


abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-676632413


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10410) OnRestoreStart disappeared from StateRestoreCallback in 2.6.0 and reappeared in a useless place

2020-08-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180811#comment-17180811
 ] 

Guozhang Wang commented on KAFKA-10410:
---

Hello [~markshelton], thanks for reporting this.

Yes this is a by-design fix to decouple the stateRestoreCallback (which is used 
for just applying records read from changelogs to state stores, per store) from 
stateRestoreListener (used for indicating the beginning / end / etc during 
restoration, global) since originally they was merged in a hacky way to just 
enable certain rocksDB optimization which turns out to have other side effects.

Could you let me know why you cannot set the state restore listener through 
`KafkaStreams#setGlobalStateRestoreListener`? Maybe I can try to figure out 
something to unblock you.

> OnRestoreStart disappeared from StateRestoreCallback  in 2.6.0 and reappeared 
> in a useless place
> 
>
> Key: KAFKA-10410
> URL: https://issues.apache.org/jira/browse/KAFKA-10410
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Mark Shelton
>Priority: Blocker
>
> In version 2.5.0 and earlier there are "onRestoreStart" and "onRestoreEnd" 
> methods on StateRestoreCallback.
> Version 2.6.0 removed these calls and put them into StateRestoreListener and 
> requires "streaming.setGlobalStateRestoreListener".
> This makes it impossible for the actual StateRestoreCallback implementation 
> to receive the start and end indication and is blocking me from moving to 
> 2.6.0.
> See:
> [https://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.html]
>  
> Related JIRA:
> https://issues.apache.org/jira/browse/KAFKA-4322 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10380) Make dist flatten rocksdbjni

2020-08-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180878#comment-17180878
 ] 

Matthias J. Sax commented on KAFKA-10380:
-

[~adriancole] – when you say "Kafka image" what do you exactly mean? RocksDB 
should only be a dependency for Kafka Streams, but not for brokers or connect.

> Make dist flatten rocksdbjni
> 
>
> Key: KAFKA-10380
> URL: https://issues.apache.org/jira/browse/KAFKA-10380
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Affects Versions: 2.6.0
>Reporter: Adrian Cole
>Priority: Major
>
> I was looking for ways to reduce the size of our Kafka image, and the most 
> notable opportunity is handling rocksdbjni differently. It is currently a 
> 15MB jar.
> As mentioned in its description rocksdbjni includes binaries for a lot of OS 
> choices.
> du -k librocksdbjni-*
> 7220  librocksdbjni-linux-aarch64.so
> 8756  librocksdbjni-linux-ppc64le.so
> 7220  librocksdbjni-linux32.so
> 7932  librocksdbjni-linux64.so
> 5440  librocksdbjni-osx.jnilib
> 4616  librocksdbjni-win64.dll
> It may not seem obvious in normal dists, which aim to work for many operating 
> systems what is a problem here. When creating docker images, we currently 
> would need to repackage this to scrub out the irrelevant OS items or accept 
> files larger than alpine itself.
> While this might be something to kick back to rocksdb. having some options 
> here would be great.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180881#comment-17180881
 ] 

Matthias J. Sax commented on KAFKA-10395:
-

[~ableegoldman] [~vvcephei] Seems the PR was merged. Can we close the ticket?

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
> Fix For: 2.7.0
>
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached

2020-08-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10396:
---

Assignee: Rohan Desai

> Overall memory of container keep on growing due to kafka stream / rocksdb and 
> OOM killed once limit reached
> ---
>
> Key: KAFKA-10396
> URL: https://issues.apache.org/jira/browse/KAFKA-10396
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.5.0
>Reporter: Vagesh Mathapati
>Assignee: Rohan Desai
>Priority: Critical
> Attachments: CustomRocksDBConfig.java, MyStreamProcessor.java, 
> kafkaStreamConfig.java
>
>
> We are observing that overall memory of our container keep on growing and 
> never came down.
> After analysis find out that rocksdbjni.so is keep on allocating 64M chunks 
> of memory off-heap and never releases back. This causes OOM kill after memory 
> reaches configured limit.
> We use Kafka stream and globalktable for our many kafka topics.
> Below is our environment
>  * Kubernetes cluster
>  * openjdk 11.0.7 2020-04-14 LTS
>  * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS)
>  * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode)
>  * Springboot 2.3
>  * spring-kafka-2.5.0
>  * kafka-streams-2.5.0
>  * kafka-streams-avro-serde-5.4.0
>  * rocksdbjni-5.18.3
> Observed same result with kafka 2.3 version.
> Below is the snippet of our analysis
> from pmap output we took addresses from these 64M allocations (RSS)
> Address Kbytes RSS Dirty Mode Mapping
> 7f3ce800 65536 65532 65532 rw--- [ anon ]
> 7f3cf400 65536 65536 65536 rw--- [ anon ]
> 7f3d6400 65536 65536 65536 rw--- [ anon ]
> We tried to match with memory allocation logs enabled with the help of Azul 
> systems team.
> @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
>  - 0x7f3ce8ff7ca0
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4]
>  - 0x7f3ce8ff9780
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
>  - 0x7f3ce8ff9750
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
>  - 0x7f3ce8ff97c0
>  @ 
> /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
>  - 0x7f3ce8ffccf0
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
>  - 0x7f3ce8ffcd10
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
>  - 0x7f3ce8ffccf0
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
>  - 0x7f3ce8ffcd10
> We also identified that content on this 64M is just 0s and no any data 
> present in it.
> I tried to tune the rocksDB configuratino as mentioned but it did not helped. 
> [https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  
> Please let me know if you need any more details



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10408) Calendar based windows

2020-08-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10408:

Labels: needs-kip  (was: )

> Calendar based windows
> --
>
> Key: KAFKA-10408
> URL: https://issues.apache.org/jira/browse/KAFKA-10408
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Antony Stubbs
>Priority: Minor
>  Labels: needs-kip
>
> A date based window, for example aggregate all payments made until each month 
> date of the 15th, or all payments made each year until April 1st.
> Should handle time zones "properly", e.g. allow user to specify which time 
> zone to base it on.
> Also should support setting a time cut off, and just simply "midnight" in the 
> given zone. (.e.g 6pm April 15th). 
> Ideally will also support day offsets, e.g. last day of every month, first 
> Tuesday of each week, last Friday of the month.
> Example implementation of a specific aggregator, with a window implementation 
> implicitly embedded:
> [https://github.com/astubbs/ks-tributary/blob/denormalisation-base-cp-libs/streams-module/src/main/java/io/confluent/ps/streams/processors/YearlyAggregator.java]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10408) Calendar based windows

2020-08-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180882#comment-17180882
 ] 

Matthias J. Sax commented on KAFKA-10408:
-

Another example implementation is this one: 
[https://github.com/confluentinc/kafka-streams-examples/blob/5.5.0-post/src/test/java/io/confluent/examples/streams/window/DailyTimeWindows.java]

> Calendar based windows
> --
>
> Key: KAFKA-10408
> URL: https://issues.apache.org/jira/browse/KAFKA-10408
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Antony Stubbs
>Priority: Minor
>  Labels: needs-kip
>
> A date based window, for example aggregate all payments made until each month 
> date of the 15th, or all payments made each year until April 1st.
> Should handle time zones "properly", e.g. allow user to specify which time 
> zone to base it on.
> Also should support setting a time cut off, and just simply "midnight" in the 
> given zone. (.e.g 6pm April 15th). 
> Ideally will also support day offsets, e.g. last day of every month, first 
> Tuesday of each week, last Friday of the month.
> Example implementation of a specific aggregator, with a window implementation 
> implicitly embedded:
> [https://github.com/astubbs/ks-tributary/blob/denormalisation-base-cp-libs/streams-module/src/main/java/io/confluent/ps/streams/processors/YearlyAggregator.java]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10395:

Comment: was deleted

(was: [~ableegoldman] [~vvcephei] Seems the PR was merged. Can we close the 
ticket?)

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
> Fix For: 2.7.0
>
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8812) Rebalance Producers

2020-08-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8812:
--

Assignee: (was: Matthias J. Sax)

> Rebalance Producers
> ---
>
> Key: KAFKA-8812
> URL: https://issues.apache.org/jira/browse/KAFKA-8812
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Werner Daehn
>Priority: Major
>  Labels: kip
>
> [KIP-509: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers]
> Please bare with me. Initially this thought sounds stupid but it has its 
> merits.
>  
> How do you build a distributed producer at the moment? You use Kafka Connect 
> which in turn requires a cluster that tells which instance is producing what 
> partitions.
> On the consumer side it is different. There Kafka itself does the data 
> distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
> data for one partition. With 5 consumers, each will get data from two 
> partitions. And if there is only a single consumer active, it gets all data. 
> All is managed by Kafka, all you have to do is start as many consumers as you 
> want.
>  
> I'd like to suggest something similar for the producers. A producer would 
> tell Kafka that its source has 10 partitions. The Kafka server then responds 
> with a list of partitions this instance shall be responsible for. If it is 
> the only producer, the response would be all 10 partitions. If it is the 
> second instance starting up, the first instance would get the information it 
> should produce data for partition 1-5 and the new one for partition 6-10. If 
> the producer fails to respond with an alive packet, a rebalance does happen, 
> informing the active producer to take more load and the dead producer will 
> get an error when sending data again.
> For restart, the producer rebalance has to send the starting point where to 
> start producing the data onwards from as well, of course. Would be best if 
> this is a user generated pointer and not the topic offset. Then it can be 
> e.g. the database system change number, a database transaction id or 
> something similar.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10380) Make dist flatten rocksdbjni

2020-08-19 Thread Adrian Cole (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180886#comment-17180886
 ] 

Adrian Cole commented on KAFKA-10380:
-

I'm referring to the binary dist (produced by the build). The problem is that 
this lib is in mixed with the others so it is hard to know which jars are 
optional for brokers or connect. I can make a special comment in our file about 
this one, but maybe a documentation or directory naming convention could be 
used to identify what's not required for basic kafka functionality (vs 
streams). since this seems a special case, it is ok to just add a comment, just 
mentioning there's no way for a passerby to know this is only needed for 
streams. there may be other larger jars also unused at runtime, so if you know 
others, let me know also!

> Make dist flatten rocksdbjni
> 
>
> Key: KAFKA-10380
> URL: https://issues.apache.org/jira/browse/KAFKA-10380
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Affects Versions: 2.6.0
>Reporter: Adrian Cole
>Priority: Major
>
> I was looking for ways to reduce the size of our Kafka image, and the most 
> notable opportunity is handling rocksdbjni differently. It is currently a 
> 15MB jar.
> As mentioned in its description rocksdbjni includes binaries for a lot of OS 
> choices.
> du -k librocksdbjni-*
> 7220  librocksdbjni-linux-aarch64.so
> 8756  librocksdbjni-linux-ppc64le.so
> 7220  librocksdbjni-linux32.so
> 7932  librocksdbjni-linux64.so
> 5440  librocksdbjni-osx.jnilib
> 4616  librocksdbjni-win64.dll
> It may not seem obvious in normal dists, which aim to work for many operating 
> systems what is a problem here. When creating docker images, we currently 
> would need to repackage this to scrub out the irrelevant OS items or accept 
> files larger than alpine itself.
> While this might be something to kick back to rocksdb. having some options 
> here would be great.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10380) Make dist flatten rocksdbjni

2020-08-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180889#comment-17180889
 ] 

Matthias J. Sax commented on KAFKA-10380:
-

Thanks for clarifying. I am sure that there are other libraries in the binary 
dist that are not required for the brokers and I understand that for a 
"minimal" broker image it's not easy to know which one you need and which ones 
could be omitted. – Not sure atm how to address this in the most 
straightforward way. \cc [~ijuma] and or [~ewencp] might have an opinion about 
this question.

> Make dist flatten rocksdbjni
> 
>
> Key: KAFKA-10380
> URL: https://issues.apache.org/jira/browse/KAFKA-10380
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Affects Versions: 2.6.0
>Reporter: Adrian Cole
>Priority: Major
>
> I was looking for ways to reduce the size of our Kafka image, and the most 
> notable opportunity is handling rocksdbjni differently. It is currently a 
> 15MB jar.
> As mentioned in its description rocksdbjni includes binaries for a lot of OS 
> choices.
> du -k librocksdbjni-*
> 7220  librocksdbjni-linux-aarch64.so
> 8756  librocksdbjni-linux-ppc64le.so
> 7220  librocksdbjni-linux32.so
> 7932  librocksdbjni-linux64.so
> 5440  librocksdbjni-osx.jnilib
> 4616  librocksdbjni-win64.dll
> It may not seem obvious in normal dists, which aim to work for many operating 
> systems what is a problem here. When creating docker images, we currently 
> would need to repackage this to scrub out the irrelevant OS items or accept 
> files larger than alpine itself.
> While this might be something to kick back to rocksdb. having some options 
> here would be great.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached

2020-08-19 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180897#comment-17180897
 ] 

Rohan Desai commented on KAFKA-10396:
-

> I mean to say if my topic going to have 10 Million record then what will be 
>the value of Cache and writeBufferManager? and if topic going to have just 5k 
>records then what will be  the value of Cache and writeBufferManager?

 

You can try to size your cache/wbm according to the available resources. So if 
you have XGB of memory on your system, choose some value significantly less 
than of that and allocate that to cache/wbm to bound memory usage from rocksdb. 
Remember, even if you choose a cache size that's smaller than it could be, the 
os will still cache data for you (it's just a bit more expensive to access it). 
So err on the side of choosing something smaller. FWIW we use ~20% of total 
memory for the cache (so 6GB out of 30). Then that, plus the size of your java 
heap should give you the total memory usage from Java and RocksDB.

> Overall memory of container keep on growing due to kafka stream / rocksdb and 
> OOM killed once limit reached
> ---
>
> Key: KAFKA-10396
> URL: https://issues.apache.org/jira/browse/KAFKA-10396
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.5.0
>Reporter: Vagesh Mathapati
>Assignee: Rohan Desai
>Priority: Critical
> Attachments: CustomRocksDBConfig.java, MyStreamProcessor.java, 
> kafkaStreamConfig.java
>
>
> We are observing that overall memory of our container keep on growing and 
> never came down.
> After analysis find out that rocksdbjni.so is keep on allocating 64M chunks 
> of memory off-heap and never releases back. This causes OOM kill after memory 
> reaches configured limit.
> We use Kafka stream and globalktable for our many kafka topics.
> Below is our environment
>  * Kubernetes cluster
>  * openjdk 11.0.7 2020-04-14 LTS
>  * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS)
>  * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode)
>  * Springboot 2.3
>  * spring-kafka-2.5.0
>  * kafka-streams-2.5.0
>  * kafka-streams-avro-serde-5.4.0
>  * rocksdbjni-5.18.3
> Observed same result with kafka 2.3 version.
> Below is the snippet of our analysis
> from pmap output we took addresses from these 64M allocations (RSS)
> Address Kbytes RSS Dirty Mode Mapping
> 7f3ce800 65536 65532 65532 rw--- [ anon ]
> 7f3cf400 65536 65536 65536 rw--- [ anon ]
> 7f3d6400 65536 65536 65536 rw--- [ anon ]
> We tried to match with memory allocation logs enabled with the help of Azul 
> systems team.
> @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
>  - 0x7f3ce8ff7ca0
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4]
>  - 0x7f3ce8ff9780
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
>  - 0x7f3ce8ff9750
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
>  - 0x7f3ce8ff97c0
>  @ 
> /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
>  - 0x7f3ce8ffccf0
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
>  - 0x7f3ce8ffcd10
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
>  - 0x7f3ce8ffccf0
>  @ /tmp/librocksdbjni6564497922441568920.so:
> _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
>  - 0x7f3ce8ffcd10
> We also identified that content on this 64M is just 0s and no any data 
> present in it.
> I tried to tune the rocksDB configuratino as mentioned but it did not helped. 
> [https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  
> Please let me know if you need any more details



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] huxihx commented on pull request #9189: KAFKA-10407: Have KafkaLog4jAppender support `linger.ms` and `batch.size`

2020-08-19 Thread GitBox


huxihx commented on pull request #9189:
URL: https://github.com/apache/kafka/pull/9189#issuecomment-676840253


   @omkreddy Thanks for the review, merging to trunk. @dongjinleekr Thanks for 
the comments again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx merged pull request #9189: KAFKA-10407: Have KafkaLog4jAppender support `linger.ms` and `batch.size`

2020-08-19 Thread GitBox


huxihx merged pull request #9189:
URL: https://github.com/apache/kafka/pull/9189


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10407) add linger.ms parameter support to KafkaLog4jAppender

2020-08-19 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180901#comment-17180901
 ] 

huxihx commented on KAFKA-10407:


Merged this to trunk and 2.7 branch.

> add linger.ms parameter support to KafkaLog4jAppender
> -
>
> Key: KAFKA-10407
> URL: https://issues.apache.org/jira/browse/KAFKA-10407
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Yu Yang
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.7.0
>
>
> Currently  KafkaLog4jAppender does not accept `linger.ms` setting.   When a 
> service has an outrage that cause excessively error logging,  the service can 
> have too many producer requests to kafka brokers and overload the broker.  
> Setting a non-zero 'linger.ms' will allow kafka producer to batch records and 
> reduce # of producer request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10407) add linger.ms parameter support to KafkaLog4jAppender

2020-08-19 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-10407.

Fix Version/s: 2.7.0
   Resolution: Fixed

> add linger.ms parameter support to KafkaLog4jAppender
> -
>
> Key: KAFKA-10407
> URL: https://issues.apache.org/jira/browse/KAFKA-10407
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Yu Yang
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.7.0
>
>
> Currently  KafkaLog4jAppender does not accept `linger.ms` setting.   When a 
> service has an outrage that cause excessively error logging,  the service can 
> have too many producer requests to kafka brokers and overload the broker.  
> Setting a non-zero 'linger.ms' will allow kafka producer to batch records and 
> reduce # of producer request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-19 Thread GitBox


abbccdda commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r473515635



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+unsentIsrUpdates synchronized {
+  unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(time.milliseconds)
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+unsentIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  unsentIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()

Review comment:
   Should we move the startup logic to `KafkaServer`? Note the channel is 
shared between different modules, so it makes sense to start and close inside 
the server.

##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collecti

[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-08-19 Thread Jerry Wei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180919#comment-17180919
 ] 

Jerry Wei commented on KAFKA-10134:
---

[~guozhang] sure, I'll porting PR #9038 on 2.6.0 and share log with u. thx.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] LMnet commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-19 Thread GitBox


LMnet commented on pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#issuecomment-676950368


   There were some failed style checks. I fixed them.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-08-19 Thread GitBox


showuon opened a new pull request #9202:
URL: https://github.com/apache/kafka/pull/9202


   Fix the `currentStateTimeStamp` doesn't get set in 
`GROUP_METADATA_VALUE_SCHEMA_V3`, and did a small refactor to use the 
`GROUP_VALUE_SCHEMAS.size - 1` replace the default hard-coded max version 
number. Also add test for it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-08-19 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-677036776


   @abbccdda @vahidhashemian @hachikuji , could you help review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx opened a new pull request #9203: MINOR: Fix typo in LeaderEpochFileCacheTest

2020-08-19 Thread GitBox


huxihx opened a new pull request #9203:
URL: https://github.com/apache/kafka/pull/9203


   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org