[jira] [Resolved] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Niketh Sabbineni (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niketh Sabbineni resolved KAFKA-10453.
--
Resolution: Workaround

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10072) Kafkaconsumer is configured with different clientid parameters to obtain different results

2020-08-31 Thread Ankit Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankit Kumar reassigned KAFKA-10072:
---

Assignee: Ankit Kumar

> Kafkaconsumer is configured with different clientid parameters to obtain 
> different results
> --
>
> Key: KAFKA-10072
> URL: https://issues.apache.org/jira/browse/KAFKA-10072
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0
> Environment: centos7.6 8C 32G
>Reporter: victor
>Assignee: Ankit Kumar
>Priority: Blocker
>
> kafka-console-consumer.sh --bootstrap-server host1:port --consumer-property 
> {color:#DE350B}client.id=aa{color} --from-beginning --topic topicA
> {color:#DE350B}There's no data
> {color}
> kafka-console-consumer.sh --bootstrap-server host1:port --consumer-property 
> {color:#DE350B}clientid=bb{color} --from-beginning --topic topicA
> {color:#DE350B}Successfully consume data{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-08-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188116#comment-17188116
 ] 

Guozhang Wang commented on KAFKA-10134:
---

Hello [~zhowei] that broker-side change is not mandatory, I just included that 
part to make the whole PR complete, but it is not a necessary change for your 
situation.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer3.log.2020-08-20.log, 
> consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10366) TimeWindowedDeserializer doesn't allow users to set a custom window size

2020-08-31 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188115#comment-17188115
 ] 

Sophie Blee-Goldman commented on KAFKA-10366:
-

Yeah thanks for tracking that down [~mjsax]. I think we can 
repurpose/reinterpret this ticket to track fixing the Streams tests/test utils. 
If we have to add a lot of overloads throughout the call hierarchy between the 
test and the actual Consumer creation, maybe we can save some work by just 
adding a ConsumerParameters class (better names welcome) and replacing the 
ConsumerConfig/Properties parameter in all the methods instead. Then we can 
just call the appropriate Consumer constructor based on whether the 
deserializers have been set in the passed in. ConsumerParameters 

> TimeWindowedDeserializer doesn't allow users to set a custom window size
> 
>
> Key: KAFKA-10366
> URL: https://issues.apache.org/jira/browse/KAFKA-10366
> Project: Kafka
>  Issue Type: Bug
>Reporter: Leah Thomas
>Assignee: Leah Thomas
>Priority: Major
>  Labels: streams
>
> Related to [KAFKA-4468|https://issues.apache.org/jira/browse/KAFKA-4468], in 
> timeWindowedDeserializer Long.MAX_VALUE is used as _windowSize_ for any 
> deserializer that uses the default constructor. While streams apps can pass 
> in a window size in serdes or while creating a timeWindowedDeserializer, the 
> deserializer that is actually used in processing the messages is created by 
> the Kafka consumer, without passing in the set windowSize. The deserializer 
> the consumer creates uses the configs, but as there is no config for 
> windowSize, the window size is always default.
> See _KStreamAggregationIntegrationTest #ShouldReduceWindowed()_ as an example 
> of this issue. Despite passing in the windowSize to both the serdes and the 
> timeWindowedDeserializer, the window size is set to Long.MAX_VALUE. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-08-31 Thread GitBox


mumrah commented on pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#issuecomment-684175203


   Example failed build with a compile error: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-9226/67
   
   One with a rat failure: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-9226/66/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no

2020-08-31 Thread Jigar Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jigar Naik updated KAFKA-10450:
---
Priority: Minor  (was: Critical)

> console-producer throws Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender) 
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> ---
>
> Key: KAFKA-10450
> URL: https://issues.apache.org/jira/browse/KAFKA-10450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: Kafka Version 2.6.0
> MacOS Version - macOS Catalina 10.15.6 (19G2021)
> java version "11.0.8" 2020-07-14 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
>Reporter: Jigar Naik
>Priority: Minor
>
> Kafka-console-producer.sh gives below error on Mac 
> ERROR [Producer clientId=console-producer] Uncaught error in kafka producer 
> I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> *Steps to re-produce the issue.* 
> Download Kafka from 
> [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
>  
> Change data and log directory (Optional)
> Create Topic Using below command 
>  
> {code:java}
> ./kafka-topics.sh \
>  --create \
>  --zookeeper localhost:2181 \
>  --replication-factor 1 \
>  --partitions 1 \
>  --topic my-topic{code}
>  
> Start Kafka console producer using below command
>  
> {code:java}
> ./kafka-console-consumer.sh \
>  --topic my-topic \
>  --from-beginning \
>  --bootstrap-server localhost:9092{code}
>  
> Gives below output
>  
> {code:java}
> ./kafka-console-producer.sh \
>  --topic my-topic \
>      --bootstrap-server 127.0.0.1:9092
> >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] 
> >Uncaught error in kafka producer I/O thread:  
> >(org.apache.kafka.clients.producer.internals.Sender)
> java.nio.BufferUnderflowException
> at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
> at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
> at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70)
> at 
> org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught 
> error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> at 
> org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
> at 
> org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap 
> broker 127.0.0.1:9092 (id: -1 rack: null) disconnected 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  
>  
> The same steps works fine with Kafka version 2.0.0 on Mac. 
> The same steps works fine with Kafka version 2.6.0 on Windows. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are n

2020-08-31 Thread Jigar Naik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188085#comment-17188085
 ] 

Jigar Naik commented on KAFKA-10450:


Yes, same version for both. And also from kafka console consumer which ship 
with kafka gave the same error. 



i have found the issue. The port 9092 was being used by sonarqube H2 DB. 
after stopping sonarqube everything worked fine. 
surprisingly i was hoping to getting address already in use instead of this 
stack trace. 
and also the kafka_topic.sh worked fine without any issue. 



Changing the priority to minor as its no-more blocker. It would be better if 
proper error message is displayed instead of this stacktrace. It would have 
saved few hours for me:) thanks! 

> console-producer throws Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender) 
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> ---
>
> Key: KAFKA-10450
> URL: https://issues.apache.org/jira/browse/KAFKA-10450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: Kafka Version 2.6.0
> MacOS Version - macOS Catalina 10.15.6 (19G2021)
> java version "11.0.8" 2020-07-14 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
>Reporter: Jigar Naik
>Priority: Critical
>
> Kafka-console-producer.sh gives below error on Mac 
> ERROR [Producer clientId=console-producer] Uncaught error in kafka producer 
> I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> *Steps to re-produce the issue.* 
> Download Kafka from 
> [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
>  
> Change data and log directory (Optional)
> Create Topic Using below command 
>  
> {code:java}
> ./kafka-topics.sh \
>  --create \
>  --zookeeper localhost:2181 \
>  --replication-factor 1 \
>  --partitions 1 \
>  --topic my-topic{code}
>  
> Start Kafka console producer using below command
>  
> {code:java}
> ./kafka-console-consumer.sh \
>  --topic my-topic \
>  --from-beginning \
>  --bootstrap-server localhost:9092{code}
>  
> Gives below output
>  
> {code:java}
> ./kafka-console-producer.sh \
>  --topic my-topic \
>      --bootstrap-server 127.0.0.1:9092
> >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] 
> >Uncaught error in kafka producer I/O thread:  
> >(org.apache.kafka.clients.producer.internals.Sender)
> java.nio.BufferUnderflowException
> at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
> at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
> at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70)
> at 
> org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught 
> error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> at 
> org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
> at 
> org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap 
> broker 127.0.0.1:9092 (id: -1 rack: null) disconnected 
> 

[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no

2020-08-31 Thread Jigar Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jigar Naik updated KAFKA-10450:
---
Priority: Critical  (was: Blocker)

> console-producer throws Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender) 
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> ---
>
> Key: KAFKA-10450
> URL: https://issues.apache.org/jira/browse/KAFKA-10450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: Kafka Version 2.6.0
> MacOS Version - macOS Catalina 10.15.6 (19G2021)
> java version "11.0.8" 2020-07-14 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
>Reporter: Jigar Naik
>Priority: Critical
>
> Kafka-console-producer.sh gives below error on Mac 
> ERROR [Producer clientId=console-producer] Uncaught error in kafka producer 
> I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> *Steps to re-produce the issue.* 
> Download Kafka from 
> [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
>  
> Change data and log directory (Optional)
> Create Topic Using below command 
>  
> {code:java}
> ./kafka-topics.sh \
>  --create \
>  --zookeeper localhost:2181 \
>  --replication-factor 1 \
>  --partitions 1 \
>  --topic my-topic{code}
>  
> Start Kafka console producer using below command
>  
> {code:java}
> ./kafka-console-consumer.sh \
>  --topic my-topic \
>  --from-beginning \
>  --bootstrap-server localhost:9092{code}
>  
> Gives below output
>  
> {code:java}
> ./kafka-console-producer.sh \
>  --topic my-topic \
>      --bootstrap-server 127.0.0.1:9092
> >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] 
> >Uncaught error in kafka producer I/O thread:  
> >(org.apache.kafka.clients.producer.internals.Sender)
> java.nio.BufferUnderflowException
> at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
> at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
> at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70)
> at 
> org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught 
> error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> at 
> org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
> at 
> org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap 
> broker 127.0.0.1:9092 (id: -1 rack: null) disconnected 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  
>  
> The same steps works fine with Kafka version 2.0.0 on Mac. 
> The same steps works fine with Kafka version 2.6.0 on Windows. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are n

2020-08-31 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188082#comment-17188082
 ] 

huxihx commented on KAFKA-10450:


Same version for both clients and brokers?

> console-producer throws Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender) 
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> ---
>
> Key: KAFKA-10450
> URL: https://issues.apache.org/jira/browse/KAFKA-10450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: Kafka Version 2.6.0
> MacOS Version - macOS Catalina 10.15.6 (19G2021)
> java version "11.0.8" 2020-07-14 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
>Reporter: Jigar Naik
>Priority: Blocker
>
> Kafka-console-producer.sh gives below error on Mac 
> ERROR [Producer clientId=console-producer] Uncaught error in kafka producer 
> I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> *Steps to re-produce the issue.* 
> Download Kafka from 
> [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
>  
> Change data and log directory (Optional)
> Create Topic Using below command 
>  
> {code:java}
> ./kafka-topics.sh \
>  --create \
>  --zookeeper localhost:2181 \
>  --replication-factor 1 \
>  --partitions 1 \
>  --topic my-topic{code}
>  
> Start Kafka console producer using below command
>  
> {code:java}
> ./kafka-console-consumer.sh \
>  --topic my-topic \
>  --from-beginning \
>  --bootstrap-server localhost:9092{code}
>  
> Gives below output
>  
> {code:java}
> ./kafka-console-producer.sh \
>  --topic my-topic \
>      --bootstrap-server 127.0.0.1:9092
> >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] 
> >Uncaught error in kafka producer I/O thread:  
> >(org.apache.kafka.clients.producer.internals.Sender)
> java.nio.BufferUnderflowException
> at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
> at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
> at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70)
> at 
> org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught 
> error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> at 
> org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
> at 
> org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap 
> broker 127.0.0.1:9092 (id: -1 rack: null) disconnected 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  
>  
> The same steps works fine with Kafka version 2.0.0 on Mac. 
> The same steps works fine with Kafka version 2.6.0 on Windows. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-08-31 Thread Jerry Wei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188058#comment-17188058
 ] 

Jerry Wei commented on KAFKA-10134:
---

[~guozhang] one more question about PR #8834, whether or not *GroupCoordinator* 
changes is mandatory. I mean Kafka server changes should be more expensive that 
clients.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer3.log.2020-08-20.log, 
> consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-31 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r480482350



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##
@@ -67,14 +70,22 @@ public Bytes peekNextKey() {
 public boolean hasNext() {
 boolean hasNext = false;
 while ((currentIterator == null || !(hasNext = 
hasNextConditionHasNext()) || !currentSegment.isOpen())
-&& segments.hasNext()) {
+&& segments.hasNext()) {
 close();
 currentSegment = segments.next();
 try {
 if (from == null || to == null) {
-currentIterator = currentSegment.all();
+if (forward) {
+currentIterator = currentSegment.all();
+} else {
+currentIterator = currentSegment.reverseAll();

Review comment:
   Yeah that's a good question, it does seem like we can just remove the 
`range` and `all` on the Segment interface





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-31 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r480481060



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##
@@ -338,25 +452,36 @@ public synchronized void close() {
 
 private CacheIteratorWrapper(final Bytes key,
  final long timeFrom,
- final long timeTo) {
-this(key, key, timeFrom, timeTo);
+ final long timeTo,
+ final boolean forward) {
+this(key, key, timeFrom, timeTo, forward);
 }
 
 private CacheIteratorWrapper(final Bytes keyFrom,
  final Bytes keyTo,
  final long timeFrom,
- final long timeTo) {
+ final long timeTo,
+ final boolean forward) {
 this.keyFrom = keyFrom;
 this.keyTo = keyTo;
 this.timeTo = timeTo;
-this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+this.forward = forward;
 
 this.segmentInterval = cacheFunction.getSegmentInterval();
-this.currentSegmentId = cacheFunction.segmentId(timeFrom);
 
-setCacheKeyRange(timeFrom, currentSegmentLastTime());
+if (forward) {
+this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+this.currentSegmentId = cacheFunction.segmentId(timeFrom);
 
-this.current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+setCacheKeyRange(timeFrom, currentSegmentLastTime());
+this.current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+} else {
+this.currentSegmentId = 
cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get()));
+this.lastSegmentId = cacheFunction.segmentId(timeFrom);
+
+setCacheKeyRange(currentSegmentBeginTime(), Math.min(timeTo, 
maxObservedTimestamp.get()));

Review comment:
   This looks right to me -- in the iterator constructor, we would normally 
start from `timeFrom` (the minimum time) and advance to the end of the current 
segment (that's what the "cache key range" defines, the range of the current 
segment) When iterating backwards, the current segment is actually the largest 
segment, so the cache key lower range is the current (largest) segment's 
beginning timestamp, and the upper range is the maximum timestamp of the 
backwards fetch. Does that make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-31 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r480479677



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##
@@ -271,27 +345,68 @@ public synchronized void put(final Bytes key,
 final PeekingKeyValueIterator 
filteredCacheIterator =
 new FilteredCacheIterator(cacheIterator, hasNextCondition, 
cacheFunction);
 return new MergedSortedCacheWindowStoreKeyValueIterator(
-filteredCacheIterator,
-underlyingIterator,
-bytesSerdes,
-windowSize,
-cacheFunction
+filteredCacheIterator,
+underlyingIterator,
+bytesSerdes,
+windowSize,
+cacheFunction,
+true
+);
+}
+
+@Override
+public KeyValueIterator, byte[]> backwardFetchAll(final 
long timeFrom,

Review comment:
   We would still need to keep this method: we're not removing all 
long-based APIs, just the public/IQ methods in ReadOnlyWindowStore. But we 
still want to keep the long-based methods on WindowStore and all the internal 
store interfaces for performance reasons.
   Maybe once we move everything to use `Instant` all the way down to the 
serialization then we can remove these long-based methods. I guess we should 
consider that when discussing KIP-667, but for the time being at least, we 
should keep them for internal use





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

2020-08-31 Thread GitBox


guozhangwang commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480477639



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, 
final Statistics statist
 statistics != null &&
 
storeToValueProviders.values().stream().anyMatch(valueProviders -> 
valueProviders.statistics == null))) {
 
-throw new IllegalStateException("Statistics for store \"" + 
segmentName + "\" of task " + taskId +
-" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another store in this " +
+throw new IllegalStateException("Statistics for segment " + 
segmentName + " of task " + taskId +
+" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another segment in this " +
 "metrics recorder is" + (statistics != null ? " " : " not ") + 
"null. " +
 "This is a bug in Kafka Streams. " +
 "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
 }
 }
 
+private void verifyDbAndCacheAndStatistics(final String segmentName,
+   final RocksDB db,
+   final Cache cache,
+   final Statistics statistics) {
+for (final DbAndCacheAndStatistics valueProviders : 
storeToValueProviders.values()) {
+verifyIfSomeAreNull(segmentName, statistics, 
valueProviders.statistics, "statistics");
+verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, 
"cache");
+if (db == valueProviders.db) {
+throw new IllegalStateException("DB instance for store " + 
segmentName + " of task " + taskId +
+" was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+"Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
+}
+if (storeToValueProviders.size() == 1 && cache != 
valueProviders.cache) {

Review comment:
   Hmm, why we need the second condition to determine `singleCache = false` 
here?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, 
final Statistics statist
 statistics != null &&
 
storeToValueProviders.values().stream().anyMatch(valueProviders -> 
valueProviders.statistics == null))) {
 
-throw new IllegalStateException("Statistics for store \"" + 
segmentName + "\" of task " + taskId +
-" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another store in this " +
+throw new IllegalStateException("Statistics for segment " + 
segmentName + " of task " + taskId +
+" is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another segment in this " +
 "metrics recorder is" + (statistics != null ? " " : " not ") + 
"null. " +
 "This is a bug in Kafka Streams. " +
 "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
 }
 }
 
+private void verifyDbAndCacheAndStatistics(final String segmentName,
+   final RocksDB db,
+   final Cache cache,
+   final Statistics statistics) {
+for (final DbAndCacheAndStatistics valueProviders : 
storeToValueProviders.values()) {
+verifyIfSomeAreNull(segmentName, statistics, 
valueProviders.statistics, "statistics");
+verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, 
"cache");
+if (db == valueProviders.db) {
+throw new IllegalStateException("DB instance for store " + 
segmentName + " of task " + taskId +
+" was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+"Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;);
+}
+if (storeToValueProviders.size() == 1 && cache != 
valueProviders.cache) {
+singleCache = false;
+} else if (singleCache && cache != valueProviders.cache || 
!singleCache && cache == valueProviders.cache) {
+throw new IllegalStateException("Caches for store " + 
storeName + " of task " + taskId +
+" are either not all distinct or do not all refer to the 

[GitHub] [kafka] guozhangwang commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-31 Thread GitBox


guozhangwang commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r480469688



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##
@@ -136,34 +174,64 @@
  * 
  * This iterator must be closed after use.
  *
- * @param from  the first key in the range
- * @param tothe last key in the range
- * @param fromTime  time range start (inclusive)
- * @param toTimetime range end (inclusive)
- * @return an iterator over windowed key-value pairs {@code , 
value>}
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive), where iteration starts.
+ * @param timeTo   time range end (inclusive), where iteration ends.
+ * @return an iterator over windowed key-value pairs {@code , 
value>}, from beginning to end of time.
  * @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException If {@code null} is used for any key.
- * @throws IllegalArgumentException if duration is negative or can't be 
represented as {@code long milliseconds}
+ * @throws NullPointerException   If {@code null} is used for any key.
+ * @throws IllegalArgumentException   if duration is negative or can't be 
represented as {@code long milliseconds}
  */
-KeyValueIterator, V> fetch(K from, K to, Instant fromTime, 
Instant toTime)
+KeyValueIterator, V> fetch(K from, K to, Instant timeFrom, 
Instant timeTo)
 throws IllegalArgumentException;
 
 /**
-* Gets all the key-value pairs in the existing windows.
-*
-* @return an iterator over windowed key-value pairs {@code , 
value>}
-* @throws InvalidStateStoreException if the store is not initialized
-*/
+ * Get all the key-value pairs in the given key range and time range from 
all the existing windows
+ * in backward order with respect to time (from end to beginning of time).
+ * 
+ * This iterator must be closed after use.
+ *
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive), where iteration ends.
+ * @param timeTo   time range end (inclusive), where iteration starts.
+ * @return an iterator over windowed key-value pairs {@code , 
value>}, from end to beginning of time.
+ * @throws InvalidStateStoreException if the store is not initialized
+ * @throws NullPointerException   If {@code null} is used for any key.
+ * @throws IllegalArgumentException   if duration is negative or can't be 
represented as {@code long milliseconds}
+ */
+KeyValueIterator, V> backwardFetch(K from, K to, Instant 
timeFrom, Instant timeTo)

Review comment:
   This is out of the scope of this PR, but I'd like to point out that the 
current IQ does not actually obey the ordering when there are multiple local 
stores hosted on that instance. For example, if there are two stores from two 
tasks hosting keys {1, 3} and {2,4}, then a range query of key [1,4] would 
return in the order of `1,3,2,4` but not `1,2,3,4` since it is looping over the 
stores only. This would be the case for either forward or backward fetches on 
range-key-range-time.
   
   For single key time range fetch, or course, there's no such issue.
   
   I think it worth documenting this for now until we have a fix (and actually 
we are going to propose something soon).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##
@@ -33,11 +33,11 @@
 /**
  * Get the value of key from a window.
  *
- * @param key   the key to fetch
- * @param time  start timestamp (inclusive) of the window
+ * @param key  the key to fetch
+ * @param time start timestamp (inclusive) of the window
  * @return The value or {@code null} if no value is found in the window
  * @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException If {@code null} is used for any key.
+ * @throws NullPointerException   If {@code null} is used for any key.

Review comment:
   nit: is this intentional? Also I'd suggest we do not use capitalized 
`If` to be consistent with the above line, ditto elsewhere below.

##
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##
@@ -150,13 +185,25 @@
  * @return an iterator over windowed key-value pairs {@code , 
value>}
  * @throws InvalidStateStoreException if the store is not initialized
  */
-@SuppressWarnings("deprecation") // note, this method must be kept if 
super#fetchAll(...) is removed
+// note, this method must be kept if super#fetchAll(...) is removed
+@SuppressWarnings("deprecation")
 KeyValueIterator, V> 

[jira] [Assigned] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()

2020-08-31 Thread Aakash Gupta (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aakash Gupta reassigned KAFKA-2200:
---

Assignee: Aakash Gupta

> kafkaProducer.send() should not call callback.onCompletion()
> 
>
> Key: KAFKA-2200
> URL: https://issues.apache.org/jira/browse/KAFKA-2200
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Aakash Gupta
>Priority: Major
>  Labels: newbie
>
> KafkaProducer.send() should not call callback.onCompletion() because this 
> might break the callback firing order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()

2020-08-31 Thread Aakash Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188042#comment-17188042
 ] 

Aakash Gupta edited comment on KAFKA-2200 at 8/31/20, 11:20 PM:


Hi [~becket_qin] 
 I am willing to take this ticket. 

As of now till date, this is how exceptions are being handled in 
kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is 
called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
{code}
 # TimeoutException in waiting for metadata update, what is your suggestion? 
How should it be handled if not via ApiException callback? As you mentioned, we 
are misusing this TimeoutException as idea was to use it only where replication 
couldn't complete within the allowed time, so should we create a new exception 
'ClientTimeoutException' to handle such scenarios, and also use the same in 
waitOnMetadata() method ?
 # Validation of message size is throwing RecordTooLargeException which extends 
ApiException. In this case, you are correct to say that producer client is 
throwing RecordTooLargeException without even interacting with server.
You've suggested 2 scenarios which can cause exceptions :
 ## *If the size of serialised uncompressed message is more than 
maxRequestSize*: I'm not sure if we can estimate the size of message keeping 
compression type in consideration. So, current implementation throws 
RecordTooLargeException based on the ESTIMATE w/o keeping into account the 
compression type. What is the expected behaviour in this case? 
 ## *If the message size is bigger than the totalMemorySize or 
memoryBufferSize* : Buffer pool would throw IllegalArgumentException when asked 
for allocation. Should we just catch this exception, record it and throw it 
back?

[~becket_qin] Can you please answer above queries and validate my 
understanding? Apologies if I've misunderstood something as I am new to Kafka 
community.


was (Author: aakashgupta96):
Hi [~becket_qin] 
I am willing to take this ticket. 

As of now till date, this is how exceptions are being handled in 
kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is 
called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
{code}
 # TimeoutException in waiting for metadata update, what is your suggestion? 
How should it be handled if not via ApiException callback? As you mentioned, we 
are misusing this TimeoutException as idea was to use it only where replication 
couldn't complete within the allowed time, so should we create a new exception 
'ClientTimeoutException' to handle such scenarios, and also use the same in 
waitOnMetadata() method ?


 # Validation of message size is throwing RecordTooLargeException which extends 
ApiException. In this case, you are correct to say that producer client is 
throwing RecordTooLargeException without even interacting with server. 

You've suggested 2 scenarios which can cause exceptions :

 ## *If the size of serialised uncompressed message is more than 
maxRequestSize*: I'm not sure if we can estimate the size of message keeping 
compression type in consideration. So, current implementation throws 
RecordTooLargeException based on the ESTIMATE w/o keeping into account the 
compression type. What is the expected behaviour in this case? 


 ## *If the message size is bigger than the totalMemorySize or 

[jira] [Commented] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()

2020-08-31 Thread Aakash Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188042#comment-17188042
 ] 

Aakash Gupta commented on KAFKA-2200:
-

Hi [~becket_qin] 
I am willing to take this ticket. 

As of now till date, this is how exceptions are being handled in 
kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is 
called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
{code}
 # TimeoutException in waiting for metadata update, what is your suggestion? 
How should it be handled if not via ApiException callback? As you mentioned, we 
are misusing this TimeoutException as idea was to use it only where replication 
couldn't complete within the allowed time, so should we create a new exception 
'ClientTimeoutException' to handle such scenarios, and also use the same in 
waitOnMetadata() method ?


 # Validation of message size is throwing RecordTooLargeException which extends 
ApiException. In this case, you are correct to say that producer client is 
throwing RecordTooLargeException without even interacting with server. 

You've suggested 2 scenarios which can cause exceptions :

 ## *If the size of serialised uncompressed message is more than 
maxRequestSize*: I'm not sure if we can estimate the size of message keeping 
compression type in consideration. So, current implementation throws 
RecordTooLargeException based on the ESTIMATE w/o keeping into account the 
compression type. What is the expected behaviour in this case? 


 ## *If the message size is bigger than the totalMemorySize or 
memoryBufferSize* : **Buffer pool would throw IllegalArgumentException when 
asked for allocation. Should we just catch this exception, record it and throw 
it back?

 

[~becket_qin] Can you please answer above queries and validate my 
understanding? Apologies if I've misunderstood something as I am new to Kafka 
community.

> kafkaProducer.send() should not call callback.onCompletion()
> 
>
> Key: KAFKA-2200
> URL: https://issues.apache.org/jira/browse/KAFKA-2200
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: newbie
>
> KafkaProducer.send() should not call callback.onCompletion() because this 
> might break the callback firing order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-08-31 Thread GitBox


mumrah commented on a change in pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#discussion_r480452574



##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'
+}
+
+def doValidation() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
+  spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+  --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\"
+'''
+  } catch(err) {
+error('Validation checks failed, aborting this build')
+  }
+}
+
+def doTest() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \
+  --profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed "$@"
+'''
+  } catch(err) {
+echo 'Some tests failed, marking this build UNSTABLE'
+currentBuild.result = 'UNSTABLE'

Review comment:
   So, this seems to have caused us to lose which build had which results 
in the summary. 
   
   Before we had:
   
![image](https://user-images.githubusercontent.com/55116/91776201-e2d0ca80-ebba-11ea-92f7-77932dbac763.png)
   
   Now we have: 
   
![image](https://user-images.githubusercontent.com/55116/91776184-d9dff900-ebba-11ea-90a3-8daf3d6c384c.png)
   
   I'll try moving the `junit` call inside the actual stage and see if that 
helps





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-31 Thread GitBox


vvcephei commented on pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#issuecomment-684074223


   Since Jenkins PR builds are still not functioning, I've merged in trunk and 
verified this pull request locally before merging it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-31 Thread GitBox


vvcephei merged pull request #9039:
URL: https://github.com/apache/kafka/pull/9039


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-31 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r480429496



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {
-state = MemberState.UNJOINED;

Review comment:
   Previously this function has two lines: update the state and record 
sensors. Now that the first is called in the caller, this function becomes a 
one-liner and hence not worthy anymore so I in-lined it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-31 Thread GitBox


vvcephei commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r480429410



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = 

[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-31 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r480425820



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() {
 joinFuture.addListener(new RequestFutureListener() {
 @Override
 public void onSuccess(ByteBuffer value) {
-// handle join completion in the callback so that the 
callback will be invoked

Review comment:
   Well I should say part of that (the enabling of the heartbeat thread) is 
in JoinGroup response handler, while the rest (update metrics, etc) is in 
SyncGroup response handler.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
 case CompletingRebalance =>
-responseCallback(Errors.REBALANCE_IN_PROGRESS)
+  // consumers may start sending heartbeat after join-group 
response, in which case
+  // we should treat them as normal hb request and reset the timer
+  val member = group.get(memberId)

Review comment:
   It would return the error code before: that is because it does not 
expect clients to send heartbeat before sending sync-group requests. Now it is 
not the case any more.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
 case CompletingRebalance =>
-responseCallback(Errors.REBALANCE_IN_PROGRESS)

Review comment:
   I had a discussion with @hachikuji about this. I think logically it 
should not return `REBALANCE_IN_PROGRESS` and clients in the future should 
update its handling logic too, maybe after some releases where we can break 
client-broker compatibility.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
 resetJoinGroupFuture();
 needsJoinPrepare = true;
 } else {
-log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+ "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+ generation, state);
 resetStateAndRejoin();
 resetJoinGroupFuture();
-return false;
 }
 } else {
 final RuntimeException exception = future.exception();
-log.info("Join group failed with {}", exception.toString());
+log.info("Rebalance failed with {}", exception.toString());

Review comment:
   The reason I changed it is exactly that it may not always due to 
join-group :) If sync-group failed, this could also be triggered.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Niketh Sabbineni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188031#comment-17188031
 ] 

Niketh Sabbineni commented on KAFKA-10453:
--

Thank you! Let me test that out then. Thanks for your help.

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-31 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r476954744



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -917,17 +938,14 @@ private synchronized void resetGeneration() {
 synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
 log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);
 
-// only reset the state to un-joined when it is not already in 
rebalancing

Review comment:
   We do not need this check any more since when we are only resetting 
generation if we see illegal generation or unknown member id, and in either 
case we should no longer heartbeat





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188029#comment-17188029
 ] 

Ismael Juma commented on KAFKA-10453:
-

Yes, it is.

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Niketh Sabbineni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188028#comment-17188028
 ] 

Niketh Sabbineni edited comment on KAFKA-10453 at 8/31/20, 10:07 PM:
-

[~ijuma] Thanks for your reply. Do you happen to know if kafka 2.5 client is 
compatible with kafka 1.1.1 core/server? I should have checked that before 
opening the issue. I am digging to check that.

 


was (Author: niketh):
[~ijuma] Thanks for your reply. Do you happen to know if kafka 2.5 client is 
compatible with kafka 1.1.1 core/server? I should have checked that before 
opening the issue.

 

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Niketh Sabbineni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188028#comment-17188028
 ] 

Niketh Sabbineni commented on KAFKA-10453:
--

[~ijuma] Thanks for your reply. Do you happen to know if kafka 2.5 client is 
compatible with kafka 1.1.1 core/server? I should have checked that before 
opening the issue.

 

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188024#comment-17188024
 ] 

Ismael Juma commented on KAFKA-10453:
-

Kafka clients have no ZK dependency, so you can upgrade them without changing 
anything else.

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tinawenqiao commented on pull request #9235: KAFKA-10449: Add some important parameter desc in connect-distributed.properties

2020-08-31 Thread GitBox


tinawenqiao commented on pull request #9235:
URL: https://github.com/apache/kafka/pull/9235#issuecomment-684064832


   In WokerConfig.java we found that REST_HOST_NAME_CONFIG(rest.host.name) and 
REST_PORT_CONFIG(rest.port) were deprecated. And some new configuration 
parameters are introduced such as LISTENERS_CONFIG(listeners), 
REST_ADVERTISED_LISTENER_CONFIG(rest.advertised.listener),ADMIN_LISTENERS_CONFIG(admin.listeners)
 but not list in the sample conf file.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tinawenqiao opened a new pull request #9235: KAFKA-10449: Add some important parameter description in connect-distributed.prope…

2020-08-31 Thread GitBox


tinawenqiao opened a new pull request #9235:
URL: https://github.com/apache/kafka/pull/9235


   …rties.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Niketh Sabbineni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188021#comment-17188021
 ] 

Niketh Sabbineni commented on KAFKA-10453:
--

IMO |[https://github.com/apache/kafka/pull/7781] is a good fix to have in older 
releases too if we can easily backport.

 

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Niketh Sabbineni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188020#comment-17188020
 ] 

Niketh Sabbineni commented on KAFKA-10453:
--

We are running on an older version of ZK. We need to upgrade to 3.x (3.4) to be 
compatible with kafka 2.5. I am trying to see if we can solve the production 
issues that we are seeing with Kafka 1.1.1 by backporting (or cherry picking if 
necessary in our fork) instead of a full upgrade spanning multiple components.

 

 

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188017#comment-17188017
 ] 

Ismael Juma commented on KAFKA-10453:
-

Can you help us understand why would it be much more involved to upgrade to 
2.5? Our aim is to make it very easy to upgrade to newer versions of clients.

> Backport of PR-7781
> ---
>
> Key: KAFKA-10453
> URL: https://issues.apache.org/jira/browse/KAFKA-10453
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Niketh Sabbineni
>Priority: Major
>
> We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
> forever to load metadata. The issue seems to have been patched in master 
> [here|[https://github.com/apache/kafka/pull/7781]]. 
> Would you *recommend* a backport of that above change to 1.1? There are 7-8 
> changes that need to be cherry picked. The other option is to upgrade to 2.5 
> (which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10453) Backport of PR-7781

2020-08-31 Thread Niketh Sabbineni (Jira)
Niketh Sabbineni created KAFKA-10453:


 Summary: Backport of PR-7781
 Key: KAFKA-10453
 URL: https://issues.apache.org/jira/browse/KAFKA-10453
 Project: Kafka
  Issue Type: Wish
  Components: clients
Affects Versions: 1.1.1
Reporter: Niketh Sabbineni


We have been hitting this bug (with kafka 1.1.1) where the Producer takes 
forever to load metadata. The issue seems to have been patched in master 
[here|[https://github.com/apache/kafka/pull/7781]]. 

Would you *recommend* a backport of that above change to 1.1? There are 7-8 
changes that need to be cherry picked. The other option is to upgrade to 2.5 
(which would be much more involved)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480407445



##
File path: 
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
##
@@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 }
   }
 
+  protected def createScramAdminClient(scramMechanism: String, user: String, 
password: String): Admin = {

Review comment:
   Ok, this now invokes a new method on SaslSetup().





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-08-31 Thread GitBox


lbradstreet commented on a change in pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#discussion_r480404009



##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'
+}
+
+def doValidation() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
+  spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+  --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\"
+'''
+  } catch(err) {
+error('Validation checks failed, aborting this build')
+  }
+}
+
+def doTest() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \
+  --profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed "$@"
+'''
+  } catch(err) {
+echo 'Some tests failed, marking this build UNSTABLE'
+currentBuild.result = 'UNSTABLE'

Review comment:
   Correct, it'll be picked up when the test output files are picked up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9234: MINOR: Record all poll invocations

2020-08-31 Thread GitBox


vvcephei commented on pull request #9234:
URL: https://github.com/apache/kafka/pull/9234#issuecomment-684038074


   Hey @guozhangwang and @cadonna , I was just looking into key metrics for 
monitoring, and it seems like this metric is misplaced. Unlike the 
`pollRecordsSensor`, it seems like the `pollSensor` should be recorded 
unconditionally. WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9234: MINOR: Record all poll invocations

2020-08-31 Thread GitBox


vvcephei opened a new pull request #9234:
URL: https://github.com/apache/kafka/pull/9234


   Record the `pollSensor` after every invocation to poll, rather than just 
when we get records back so that we can accurately gauge how often we're 
invoking Consumer#poll.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-08-31 Thread GitBox


mumrah commented on a change in pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#discussion_r480393840



##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'
+}
+
+def doValidation() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
+  spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+  --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\"
+'''
+  } catch(err) {
+error('Validation checks failed, aborting this build')
+  }
+}
+
+def doTest() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \
+  --profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed "$@"
+'''
+  } catch(err) {
+echo 'Some tests failed, marking this build UNSTABLE'
+currentBuild.result = 'UNSTABLE'

Review comment:
   I see, so in this case we'll let the `junit` directive set the build to 
unstable?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10452) Only expire preferred read replica if a leader is alive for the topic

2020-08-31 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-10452:
-
Description: 
Fetch from follower functionality periodically expires and refreshes preferred 
read replica (at `metadata.max.age.ms` interval). This allows a client to 
discover a better follower to fetch from if one becomes available.

However the expiration is done even if the current partition has no leader. It 
makes sense to get the new preferred replica information and update existing 
one, instead of expiring existing one and then fetching new one.

Doing this will allow clients to keep on fetching from a follower instead of 
failing to find leader when all ISR replicas go offline.

 

  was:
Fetch from follower functionality periodically expires and refreshes preferred 
read replica (at `metadata.max.age.ms` interval). This allows a client to 
discover a better follower to fetch from if one becomes available.

However the expiration is done even if the current partition has no leader (can 
happen in DR scenario with observers). It makes sense to get the new preferred 
replica information and update existing one, instead of expiring existing one 
and then fetching new one.

Doing this will allow clients to keep on fetching from a follower/observer 
instead of failing to find leader when all ISR replicas go offline.

 


> Only expire preferred read replica if a leader is alive for the topic
> -
>
> Key: KAFKA-10452
> URL: https://issues.apache.org/jira/browse/KAFKA-10452
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
>
> Fetch from follower functionality periodically expires and refreshes 
> preferred read replica (at `metadata.max.age.ms` interval). This allows a 
> client to discover a better follower to fetch from if one becomes available.
> However the expiration is done even if the current partition has no leader. 
> It makes sense to get the new preferred replica information and update 
> existing one, instead of expiring existing one and then fetching new one.
> Doing this will allow clients to keep on fetching from a follower instead of 
> failing to find leader when all ISR replicas go offline.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-08-31 Thread GitBox


lbradstreet commented on a change in pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#discussion_r480392178



##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'
+}
+
+def doValidation() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
+  spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+  --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\"
+'''
+  } catch(err) {
+error('Validation checks failed, aborting this build')
+  }
+}
+
+def doTest() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \
+  --profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed "$@"
+'''
+  } catch(err) {
+echo 'Some tests failed, marking this build UNSTABLE'
+currentBuild.result = 'UNSTABLE'

Review comment:
   I think this may mark a build as UNSTABLE if one of gradle's executors 
exited. We're trying to achieve the property that the build won't fail 
completely if one of the tests fails, right? If so, I think the best way is to 
supply `-PignoreFailures=true` at 
https://github.com/apache/kafka/pull/9226/files#diff-58231b16fdee45a03a4ee3cf94a9f2c3R44,
 and add:
   
   ```
 task integrationTest(type: Test, dependsOn: compileJava) { 


   ignoreFailures = userIgnoreFailures  


   ```
   
   ```



 task unitTest(type: Test, dependsOn: compileJava) {


   ignoreFailures = userIgnoreFailures  


   ```
   
   ```
   ext {


 userIgnoreFailures = project.hasProperty('ignoreFailures') ? 
ignoreFailures : false  

  
   ```
   
   to build.gradle.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-08-31 Thread GitBox


mumrah commented on a change in pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#discussion_r480391239



##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'
+}
+
+def doValidation() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
+  spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+  --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\"
+'''
+  } catch(err) {
+error('Validation checks failed, aborting this build')

Review comment:
   I think the err here will be like an ExectuionException -- i'm going to 
try something different than this try/catch actually





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-08-31 Thread GitBox


mumrah commented on a change in pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#discussion_r480388916



##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'

Review comment:
   Yes informational to see the gradle version and jdk that's running





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing

2020-08-31 Thread GitBox


ijuma commented on a change in pull request #9226:
URL: https://github.com/apache/kafka/pull/9226#discussion_r480364696



##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'

Review comment:
   Is this informational so that we know the gradle version? Or is there 
more to it?

##
File path: Jenkinsfile
##
@@ -0,0 +1,200 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+def setupGradle() {
+  // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167
+  dir('.gradle') {
+deleteDir()
+  }
+  sh './gradlew -version'
+}
+
+def doValidation() {
+  try {
+sh '''
+  ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
+  spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+  --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\"
+'''
+  } catch(err) {
+error('Validation checks failed, aborting this build')

Review comment:
   Should we include `err` in the output somehow? Or is that not needed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10452) Only expire preferred read replica if a leader is alive for the topic

2020-08-31 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-10452:


 Summary: Only expire preferred read replica if a leader is alive 
for the topic
 Key: KAFKA-10452
 URL: https://issues.apache.org/jira/browse/KAFKA-10452
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Jeff Kim
Assignee: Jeff Kim


Fetch from follower functionality periodically expires and refreshes preferred 
read replica (at `metadata.max.age.ms` interval). This allows a client to 
discover a better follower to fetch from if one becomes available.

However the expiration is done even if the current partition has no leader (can 
happen in DR scenario with observers). It makes sense to get the new preferred 
replica information and update existing one, instead of expiring existing one 
and then fetching new one.

Doing this will allow clients to keep on fetching from a follower/observer 
instead of failing to find leader when all ISR replicas go offline.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10429) Group Coordinator unavailability leads to missing events

2020-08-31 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187992#comment-17187992
 ] 

John Roesler commented on KAFKA-10429:
--

Hi Navinder,

My first thought is that version 1.1.1 is extremely old, and a lot has actually 
changed in the consumers since then. Is there any chance you can try with a 
newer version of Streams and see if you still observe the issue?

Aside from that, from the logs you posted, it looks like in only took that 
instance a few seconds to re-acquire the connection to the coordinator, but the 
next paragraph implies that disconnections have lasted hours. Can you clarify?

A few other notes:
 * Disconnecting from the coordinator shouldn't interrupt processing, since you 
can still fetch from the leader and followers of the topic partitions you're 
assigned
 * If an instance is disconnected for longer than the session interval, you 
would actually see rebalances caused by that interval having dropped out of the 
group
 * If the log cleaner removes some offsets after the consumer's current 
position, there would be an InvalidOffsetException (unless there's an 
auto-reset policy configured), so you wouldn't silently miss data 

> Group Coordinator unavailability leads to missing events
> 
>
> Key: KAFKA-10429
> URL: https://issues.apache.org/jira/browse/KAFKA-10429
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Navinder Brar
>Priority: Major
>
> We are regularly getting this Exception in logs.
> [2020-08-25 03:24:59,214] INFO [Consumer 
> clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group 
> coordinator ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, 
> will attempt rediscovery 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  
> And after sometime it becomes discoverable:
> [2020-08-25 03:25:02,218] INFO [Consumer 
> clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, 
> groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: 
> null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  
> Now, the doubt I have is why this unavailability doesn't trigger a rebalance 
> in the cluster. We have few hours of retention on the source Kafka Topics and 
> sometimes this unavailability stays over for more than few hours and since it 
> doesn't trigger a rebalance or stops processing on other nodes(which are 
> connected to GC) we never come to know that some issue has happened and till 
> then we lose events from our source topics. 
>  
> There are some resolutions mentioned on stackoverflow but those configs are 
> already set in our kafka:
> default.replication.factor=3
> offsets.topic.replication.factor=3
>  
> It would be great to understand why this issue is happening and why it 
> doesn't trigger a rebalance and is there any known solution for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480370634



##
File path: 
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
##
@@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 }
   }
 
+  protected def createScramAdminClient(scramMechanism: String, user: String, 
password: String): Admin = {

Review comment:
   See above





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480370463



##
File path: 
core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
##
@@ -248,4 +250,25 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
 producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
 createProducer()
   }
+
+  private def createScramAdminClient(user: String, password: String): Admin = {

Review comment:
   Ok, I added the following code to SaslSetup, and we implement that first 
method in the 3 test classes that use this functionality.
   
   ```
 def createPrivilegedAdminClient(): Admin = {
   // create an admin client instance that is authorized to create 
credentials
   throw new UnsupportedOperationException("Must implement this if a test 
needs to use it")
 }
   
 def createScramCredentialsViaPrivilegedAdminClient(userName: String, 
password: String): Unit = {
   val privilegedAdminClient = createPrivilegedAdminClient() // must 
explicitly implement this method
   try {
 // create the SCRAM credential for the given user
 createScramCredentials(privilegedAdminClient, userName, password)
   } finally {
 privilegedAdminClient.close()
   }
 }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-08-31 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10417:
-
Fix Version/s: 2.6.1

> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Priority: Blocker
>  Labels: kafka-streams
> Fix For: 2.7.0, 2.6.1
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-31 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187962#comment-17187962
 ] 

John Roesler commented on KAFKA-10434:
--

Marked as a 3.0 blocker so we will be sure to consider removing the methods at 
the time of the 3.0 release. If we decide at that time not to do it yet, we'll 
just move it to the 4.0 release.

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-31 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10434:
-
Priority: Blocker  (was: Major)

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10445) Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore

2020-08-31 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-10445:


Assignee: Jorge Esteban Quilcate Otoya

> Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore
> ---
>
> Key: KAFKA-10445
> URL: https://issues.apache.org/jira/browse/KAFKA-10445
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-31 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10434:
-
Fix Version/s: 3.0.0

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10384) Separate converters from generated messages

2020-08-31 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-10384.
--
Fix Version/s: 2.7.0
   Resolution: Fixed

> Separate converters from generated messages
> ---
>
> Key: KAFKA-10384
> URL: https://issues.apache.org/jira/browse/KAFKA-10384
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 2.7.0
>
>
> Separate the JSON converter classes from the message classes, so that the 
> clients module can be used without Jackson on the CLASSPATH.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480329570



##
File path: 
core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
##
@@ -42,7 +42,18 @@ class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
   override def setUp(): Unit = {
 super.setUp()
 // Create client credentials after starting brokers so that dynamic 
credential creation is also tested
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
+createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
+createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
+  }
+
+  private def createScramCredentialWithScramAdminClient(user: String, 
password: String) = {

Review comment:
   It was a goal to eliminate all SCRAM credential creation via ZooKeeper 
where possible.  The only places that do so after this PR are when credentials 
have to be created before the brokers are started (i.e. when the inter-broker 
security protocol is SASL/SCRAM).  This code used to create the credential 
directly via ZooKeeper, but since it occurs after the brokers start it can use 
the admin client.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480324560



##
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##
@@ -1047,8 +1047,8 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
   @Test
   def testAddRemoveSaslListeners(): Unit = {
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
+createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
+createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
 initializeKerberos()

Review comment:
   Good point.  It wasn't waiting before, and it probably didn't/doesn't 
matter since we were spending time initializing Kerberos, but I added the check 
anyway just to be safe.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size

2020-08-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9649:
--

Assignee: Leah Thomas

> Remove/Warn on use of TimeWindowedSerde with no specified window size
> -
>
> Key: KAFKA-9649
> URL: https://issues.apache.org/jira/browse/KAFKA-9649
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sören Henning
>Assignee: Leah Thomas
>Priority: Major
>  Labels: kip
>
> The API of the 
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
>  promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal 
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the 
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
>  which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer inner) {
>   this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} 
> constructor or at least to warn when using it (if required for backwards 
> compatiblity). The ideal solution of course would be to get the window size 
> from some externally provided context. However, I expect this to be difficult 
> to realize. Same applies also the {{TimeWindowedDeserializer(final 
> Deserializer inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time 
> declarations use {{Duration}} s instead of long-encoded milliseconds, I 
> suggest to allow specifying window sizes with a {{Duration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size

2020-08-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9649:
---
Labels: kip  (was: )

> Remove/Warn on use of TimeWindowedSerde with no specified window size
> -
>
> Key: KAFKA-9649
> URL: https://issues.apache.org/jira/browse/KAFKA-9649
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sören Henning
>Priority: Major
>  Labels: kip
>
> The API of the 
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
>  promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal 
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the 
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
>  which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer inner) {
>   this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} 
> constructor or at least to warn when using it (if required for backwards 
> compatiblity). The ideal solution of course would be to get the window size 
> from some externally provided context. However, I expect this to be difficult 
> to realize. Same applies also the {{TimeWindowedDeserializer(final 
> Deserializer inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time 
> declarations use {{Duration}} s instead of long-encoded milliseconds, I 
> suggest to allow specifying window sizes with a {{Duration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480301973



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>
+  exitStatus = Some(status)
+  throw new RuntimeException
+}
+try {
+  Console.withOut(printStream) {
+ConfigCommand.main(Array("--bootstrap-server", brokerList) ++ args)
+  }
+  ConfigCommandResult(byteArrayOutputStream.toString(utf8))
+} catch {
+  case e: Exception => {

Review comment:
   Logging it at debug level doesn't hurt, so I added it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480282139



##
File path: core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
##
@@ -169,6 +169,18 @@ object JaasTestUtils {
 jaasFile
   }
 
+  // Returns a SASL/SCRAM configuration using credentials for the given user 
and password
+  def scramClientLoginModule(mechanism: String, scramUser: String, 
scramPassword: String): String = {
+mechanism match {
+  case "SCRAM-SHA-256" | "SCRAM-SHA-512" =>

Review comment:
   I fixed this in the other places in this file as well





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-31 Thread GitBox


ableegoldman commented on pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#issuecomment-683917229


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-31 Thread GitBox


ableegoldman commented on pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#issuecomment-683916587


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480272048



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -575,10 +577,21 @@ def set_unclean_leader_election(self, topic, value=True, 
node=None):
 node.account.ssh(cmd)
 
 def _connect_setting_kafka_configs(self, node):
+# Use this for everything related to kafka-configs except User SCRAM 
Credentials
 if node.version.kafka_configs_command_uses_bootstrap_server():
-return "--bootstrap-server %s " % 
self.bootstrap_servers(self.security_protocol)
+return "--bootstrap-server %s --command-config <(echo '%s')" % 
(self.bootstrap_servers(self.security_protocol),

Review comment:
   I created https://issues.apache.org/jira/browse/KAFKA-10451 to track 
this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10451) system tests send large command over ssh instead of using remote file for security config

2020-08-31 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-10451:
-

 Summary: system tests send large command over ssh instead of using 
remote file for security config
 Key: KAFKA-10451
 URL: https://issues.apache.org/jira/browse/KAFKA-10451
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Ron Dagostino


In `kafka.py` the pattern used to supply security configuration information to 
remote CLI tools is to send the information as part of the ssh command.  For 
example, see this --command-config definition:

{{Running ssh command: export 
KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/security/admin_client_as_broker_jaas.conf
 -Djava.security.krb5.conf=/mnt/security/krb5.conf"; 
/opt/kafka-dev/bin/kafka-configs.sh --bootstrap-server worker2:9095 
--command-config <(echo '
ssl.endpoint.identification.algorithm=HTTPS
sasl.kerberos.service.name=kafka
security.protocol=SASL_SSL
ssl.keystore.location=/mnt/security/test.keystore.jks
ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.keystore.password=test-ks-passwd
sasl.mechanism=SCRAM-SHA-256
ssl.truststore.password=test-ts-passwd
ssl.key.password=test-ks-passwd
sasl.mechanism.inter.broker.protocol=GSSAPI
') --entity-name kafka-client --entity-type users --alter --add-config 
SCRAM-SHA-256=[password=client-secret]}}

This ssh command length is getting pretty big.  It would be best if this 
referred to a file as opposed to sending in the file contents as part of the 
ssh command.

This happens in a few places in `kafka/py` and should be rectified.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] asdaraujo commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-08-31 Thread GitBox


asdaraujo commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-683909598


   @mimaison Thanks for the feedback. I've refactored the tests. Could you 
please give it another review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-08-31 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r480267347



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -190,24 +211,19 @@ public void close() {
 public void testReplication() throws InterruptedException {
 
 // create consumers before starting the connectors so we don't need to 
wait for discovery
-Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "test-topic-1", 
"backup.test-topic-1");
+Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", 
"backup.test-topic-1");
 consumer1.poll(Duration.ofMillis(500));
 consumer1.commitSync();
 consumer1.close();
 
-Consumer consumer2 = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "test-topic-1", 
"primary.test-topic-1");
+Consumer consumer2 = 
backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", 
"primary.test-topic-1");

Review comment:
   Good catch. Also changed this and we're now consuming it only once.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-08-31 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r480267122



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
 backup.kafka().createTopic("primary.test-topic-1", 1);
 backup.kafka().createTopic("heartbeats", 1);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
-backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-2-" + i);
-}
+// produce to all partitions but the last one
+produceMessages(primary, "test-topic-1", NUM_PARTITIONS - 1, 
"message-1-");
+produceMessages(backup, "test-topic-1", NUM_PARTITIONS - 1, 
"message-2-");
+
+consumerProps = new HashMap() {{

Review comment:
   I changed the way this is done, setting it at the test-level, with 
test-specific CGs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-08-31 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r480266459



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
 backup.kafka().createTopic("primary.test-topic-1", 1);
 backup.kafka().createTopic("heartbeats", 1);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
-backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-2-" + i);
-}
+// produce to all partitions but the last one

Review comment:
   I've separated the test from the existing ones, also using a different 
topic. Some of the logic on those tests is complex and may be hard to follow so 
I thought it would be better to have the tests totally separate and simpler to 
interpret.
   
   I think, and hope, it's easier to understand now than it was before.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-08-31 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r480265451



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -367,14 +406,37 @@ public void testOneWayReplicationWithAutorOffsetSync1() 
throws InterruptedExcept
 time.sleep(5000);
 
 // create a consumer at backup cluster with same consumer group Id to 
consume old and new topic
-consumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "primary.test-topic-1", 
"primary.test-topic-2");
+consumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, 
"primary.test-topic-1", "primary.test-topic-2");
 
 records = consumer.poll(Duration.ofMillis(500));
 // similar reasoning as above, no more records to consume by the same 
consumer group at backup cluster
 assertEquals("consumer record size is not zero", 0, records.count());
 consumer.close();
+}
+
+private void produceMessages(EmbeddedConnectCluster cluster, String 
topicName, int partitions, String msgPrefix) {
+for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+// produce to all partitions but the last one

Review comment:
   Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no

2020-08-31 Thread Jigar Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jigar Naik updated KAFKA-10450:
---
Priority: Blocker  (was: Critical)

> console-producer throws Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender) 
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> ---
>
> Key: KAFKA-10450
> URL: https://issues.apache.org/jira/browse/KAFKA-10450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: Kafka Version 2.6.0
> MacOS Version - macOS Catalina 10.15.6 (19G2021)
> java version "11.0.8" 2020-07-14 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
>Reporter: Jigar Naik
>Priority: Blocker
>
> Kafka-console-producer.sh gives below error on Mac 
> ERROR [Producer clientId=console-producer] Uncaught error in kafka producer 
> I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> *Steps to re-produce the issue.* 
> Download Kafka from 
> [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
>  
> Change data and log directory (Optional)
> Create Topic Using below command 
>  
> {code:java}
> ./kafka-topics.sh \
>  --create \
>  --zookeeper localhost:2181 \
>  --replication-factor 1 \
>  --partitions 1 \
>  --topic my-topic{code}
>  
> Start Kafka console producer using below command
>  
> {code:java}
> ./kafka-console-consumer.sh \
>  --topic my-topic \
>  --from-beginning \
>  --bootstrap-server localhost:9092{code}
>  
> Gives below output
>  
> {code:java}
> ./kafka-console-producer.sh \
>  --topic my-topic \
>      --bootstrap-server 127.0.0.1:9092
> >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] 
> >Uncaught error in kafka producer I/O thread:  
> >(org.apache.kafka.clients.producer.internals.Sender)
> java.nio.BufferUnderflowException
> at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
> at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
> at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70)
> at 
> org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught 
> error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> at 
> org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
> at 
> org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap 
> broker 127.0.0.1:9092 (id: -1 rack: null) disconnected 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  
>  
> The same steps works fine with Kafka version 2.0.0 on Mac. 
> The same steps works fine with Kafka version 2.6.0 on Windows. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no

2020-08-31 Thread Jigar Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jigar Naik updated KAFKA-10450:
---
Priority: Critical  (was: Blocker)

> console-producer throws Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender) 
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> ---
>
> Key: KAFKA-10450
> URL: https://issues.apache.org/jira/browse/KAFKA-10450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: Kafka Version 2.6.0
> MacOS Version - macOS Catalina 10.15.6 (19G2021)
> java version "11.0.8" 2020-07-14 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
>Reporter: Jigar Naik
>Priority: Critical
>
> Kafka-console-producer.sh gives below error on Mac 
> ERROR [Producer clientId=console-producer] Uncaught error in kafka producer 
> I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> *Steps to re-produce the issue.* 
> Download Kafka from 
> [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
>  
> Change data and log directory (Optional)
> Create Topic Using below command 
>  
> {code:java}
> ./kafka-topics.sh \
>  --create \
>  --zookeeper localhost:2181 \
>  --replication-factor 1 \
>  --partitions 1 \
>  --topic my-topic{code}
>  
> Start Kafka console producer using below command
>  
> {code:java}
> ./kafka-console-consumer.sh \
>  --topic my-topic \
>  --from-beginning \
>  --bootstrap-server localhost:9092{code}
>  
> Gives below output
>  
> {code:java}
> ./kafka-console-producer.sh \
>  --topic my-topic \
>      --bootstrap-server 127.0.0.1:9092
> >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] 
> >Uncaught error in kafka producer I/O thread:  
> >(org.apache.kafka.clients.producer.internals.Sender)
> java.nio.BufferUnderflowException
> at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
> at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
> at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70)
> at 
> org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught 
> error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> at 
> org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
> at 
> org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap 
> broker 127.0.0.1:9092 (id: -1 rack: null) disconnected 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  
>  
> The same steps works fine with Kafka version 2.0.0 on Mac. 
> The same steps works fine with Kafka version 2.6.0 on Windows. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no

2020-08-31 Thread Jigar Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jigar Naik updated KAFKA-10450:
---
Priority: Blocker  (was: Major)

> console-producer throws Uncaught error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender) 
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> ---
>
> Key: KAFKA-10450
> URL: https://issues.apache.org/jira/browse/KAFKA-10450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: Kafka Version 2.6.0
> MacOS Version - macOS Catalina 10.15.6 (19G2021)
> java version "11.0.8" 2020-07-14 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
>Reporter: Jigar Naik
>Priority: Blocker
>
> Kafka-console-producer.sh gives below error on Mac 
> ERROR [Producer clientId=console-producer] Uncaught error in kafka producer 
> I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> *Steps to re-produce the issue.* 
> Download Kafka from 
> [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
>  
> Change data and log directory (Optional)
> Create Topic Using below command 
>  
> {code:java}
> ./kafka-topics.sh \
>  --create \
>  --zookeeper localhost:2181 \
>  --replication-factor 1 \
>  --partitions 1 \
>  --topic my-topic{code}
>  
> Start Kafka console producer using below command
>  
> {code:java}
> ./kafka-console-consumer.sh \
>  --topic my-topic \
>  --from-beginning \
>  --bootstrap-server localhost:9092{code}
>  
> Gives below output
>  
> {code:java}
> ./kafka-console-producer.sh \
>  --topic my-topic \
>      --bootstrap-server 127.0.0.1:9092
> >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] 
> >Uncaught error in kafka producer I/O thread:  
> >(org.apache.kafka.clients.producer.internals.Sender)
> java.nio.BufferUnderflowException
> at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
> at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
> at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
> at 
> org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70)
> at 
> org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught 
> error in kafka producer I/O thread:  
> (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: There are no in-flight requests for node -1
> at 
> org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
> at 
> org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.base/java.lang.Thread.run(Thread.java:834)
> [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap 
> broker 127.0.0.1:9092 (id: -1 rack: null) disconnected 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  
>  
> The same steps works fine with Kafka version 2.0.0 on Mac. 
> The same steps works fine with Kafka version 2.6.0 on Windows. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no

2020-08-31 Thread Jigar Naik (Jira)
Jigar Naik created KAFKA-10450:
--

 Summary: console-producer throws Uncaught error in kafka producer 
I/O thread:  (org.apache.kafka.clients.producer.internals.Sender) 
java.lang.IllegalStateException: There are no in-flight requests for node -1
 Key: KAFKA-10450
 URL: https://issues.apache.org/jira/browse/KAFKA-10450
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.6.0
 Environment: Kafka Version 2.6.0
MacOS Version - macOS Catalina 10.15.6 (19G2021)
java version "11.0.8" 2020-07-14 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
Reporter: Jigar Naik


Kafka-console-producer.sh gives below error on Mac 

ERROR [Producer clientId=console-producer] Uncaught error in kafka producer I/O 
thread:  (org.apache.kafka.clients.producer.internals.Sender)

java.lang.IllegalStateException: There are no in-flight requests for node -1

*Steps to re-produce the issue.* 

Download Kafka from 
[kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz]
 

Change data and log directory (Optional)

Create Topic Using below command 

 
{code:java}
./kafka-topics.sh \
 --create \
 --zookeeper localhost:2181 \
 --replication-factor 1 \
 --partitions 1 \
 --topic my-topic{code}
 

Start Kafka console producer using below command

 
{code:java}
./kafka-console-consumer.sh \
 --topic my-topic \
 --from-beginning \
 --bootstrap-server localhost:9092{code}
 

Gives below output

 
{code:java}
./kafka-console-producer.sh \
     --topic my-topic \
     --bootstrap-server 127.0.0.1:9092
>[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] Uncaught 
>error in kafka producer I/O thread:  
>(org.apache.kafka.clients.producer.internals.Sender)
java.nio.BufferUnderflowException
at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650)
at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391)
at 
org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
at 
org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
at 
org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70)
at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught 
error in kafka producer I/O thread:  
(org.apache.kafka.clients.producer.internals.Sender)
java.lang.IllegalStateException: There are no in-flight requests for node -1
at 
org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
at 
org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap 
broker 127.0.0.1:9092 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient)
{code}
 

 

The same steps works fine with Kafka version 2.0.0 on Mac. 

The same steps works fine with Kafka version 2.6.0 on Windows. 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context

2020-08-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10448:

Labels: needs-kip  (was: )

> Preserve Source Partition in Kafka Streams from context
> ---
>
> Key: KAFKA-10448
> URL: https://issues.apache.org/jira/browse/KAFKA-10448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: satya
>Priority: Minor
>  Labels: needs-kip
>
> Currently Kafka streams Sink Nodes use default partitioner or has the 
> provision of using a custom partitioner which has to be dependent on 
> key/value. I am looking for an enhancement of Sink Node to ensure source 
> partition is preserved instead of deriving the partition again using 
> key/value. One of our use case has producers which have custom partitioners 
> that we dont have access to as it is a third-party application. By simply 
> preserving the partition through context.partition() would be helpful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-31 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-683863356


   > It seems that the average across multiple runs doesn't change much?
   
   yep. I didn't observe obvious regression caused by this patch.
   
   > Also, 1 failure in the latest system test run.
   
   ```kafkatest.tests.connect.connect_distributed_test``` was flaky (see 
https://issues.apache.org/jira/browse/KAFKA-10289)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context

2020-08-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10448:

Priority: Minor  (was: Critical)

> Preserve Source Partition in Kafka Streams from context
> ---
>
> Key: KAFKA-10448
> URL: https://issues.apache.org/jira/browse/KAFKA-10448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: satya
>Priority: Minor
>
> Currently Kafka streams Sink Nodes use default partitioner or has the 
> provision of using a custom partitioner which has to be dependent on 
> key/value. I am looking for an enhancement of Sink Node to ensure source 
> partition is preserved instead of deriving the partition again using 
> key/value. One of our use case has producers which have custom partitioners 
> that we dont have access to as it is a third-party application. By simply 
> preserving the partition through context.partition() would be helpful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-31 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-683859366


   @chia7712 : Thanks for the performance results. It seems that the average 
across multiple runs doesn't change much?
   
   Also, 1 failure in the latest system test run.
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-08-30--001.1598849124--chia7712--fix_8334_avoid_deadlock--960f19b29/report.html
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-31 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r480194412



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] jeqo commented on pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-31 Thread GitBox


jeqo commented on pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#issuecomment-683830757


   @ableegoldman key ordering is added to `InMemoryWindowStore` now



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size

2020-08-31 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187747#comment-17187747
 ] 

Sören Henning edited comment on KAFKA-9649 at 8/31/20, 1:49 PM:


Hi, sorry for the late response, vacation came up... :) 

In our case, we observed this issue when grouping a windowed KTable by a new 
key for a subsequent aggregation:

{noformat}
KTable, V> myTable = //...
KGroupedTable, V> =  myTable
  .groupBy(
(k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), 
k.window()), v),
Grouped.with(
  new WindowedSerdes.TimeWindowedSerde<>(
myTableXYAttributeSerde,
myTableWindowSize),
  myTableValueSerde));
{noformat}

Here, we have a windowed KTable with keys of type {{K}} and want to group it by 
a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}.

When not passing {{myTableWindowSize}} to the 
{{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which 
actually are of type {{Windowed}}, are not assigned the correct end 
timestamps. This issue does not immediately becomes apparent, only a log 
message is produced:

{noformat}
WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window 
end time was truncated to Long.MAX
{noformat}


was (Author: soerenhenning):
Hi, sorry for the late response, vacation came up... :) 

In our case, we observed this issue when grouping a windowed KTable by a new 
key for subsequent aggregation:

{noformat}
KTable, V> myTable = //...
KGroupedTable, V> =  myTable
  .groupBy(
(k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), 
k.window()), v),
Grouped.with(
  new WindowedSerdes.TimeWindowedSerde<>(
myTableXYAttributeSerde,
myTableWindowSize),
  myTableValueSerde));
{noformat}

Here, we have a windowed KTable with keys of type {{K}} and want to group it by 
a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}.

When not passing {{myTableWindowSize}} to the 
{{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which 
actually are of type {{Windowed}}, are not assigned the correct end 
timestamps. This issue does not immediately becomes apparent, only a log 
message is produced:

{noformat}
WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window 
end time was truncated to Long.MAX
{noformat}

> Remove/Warn on use of TimeWindowedSerde with no specified window size
> -
>
> Key: KAFKA-9649
> URL: https://issues.apache.org/jira/browse/KAFKA-9649
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sören Henning
>Priority: Major
>
> The API of the 
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
>  promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal 
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the 
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
>  which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer inner) {
>   this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} 
> constructor or at least to warn when using it (if required for backwards 
> compatiblity). The ideal solution of course would be to get the window size 
> from some externally provided context. However, I expect this to be difficult 
> to realize. Same applies also the {{TimeWindowedDeserializer(final 
> Deserializer inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time 
> declarations use {{Duration}} s instead of long-encoded milliseconds, I 
> suggest to allow specifying window sizes with a {{Duration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size

2020-08-31 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187747#comment-17187747
 ] 

Sören Henning commented on KAFKA-9649:
--

Hi, sorry for the late response, vacation came up... :) 

In our case, we observed this issue when grouping a windowed KTable by a new 
key for subsequent aggregation:

{noformat}
KTable, V> myTable = //...
KGroupedTable, V> =  myTable
  .groupBy(
(k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), 
k.window()), v),
Grouped.with(
  new WindowedSerdes.TimeWindowedSerde<>(
myTableXYAttributeSerde,
myTableWindowSize),
  myTableValueSerde));
{noformat}

Here, we have a windowed KTable with keys of type {{K}} and want to group it by 
a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}.

When not passing {{myTableWindowSize}} to the 
{{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which 
actually are of type {{Windowed}}, are not assigned the correct end 
timestamps. This issue does not immediately becomes apparent, only a log 
message is produced:

{noformat}
WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window 
end time was truncated to Long.MAX
{noformat}

> Remove/Warn on use of TimeWindowedSerde with no specified window size
> -
>
> Key: KAFKA-9649
> URL: https://issues.apache.org/jira/browse/KAFKA-9649
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sören Henning
>Priority: Major
>
> The API of the 
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
>  promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal 
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the 
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
>  which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer inner) {
>   this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} 
> constructor or at least to warn when using it (if required for backwards 
> compatiblity). The ideal solution of course would be to get the window size 
> from some externally provided context. However, I expect this to be difficult 
> to realize. Same applies also the {{TimeWindowedDeserializer(final 
> Deserializer inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time 
> declarations use {{Duration}} s instead of long-encoded milliseconds, I 
> suggest to allow specifying window sizes with a {{Duration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino closed pull request #9233: Testing

2020-08-31 Thread GitBox


rondagostino closed pull request #9233:
URL: https://github.com/apache/kafka/pull/9233


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #9233: Testing

2020-08-31 Thread GitBox


rondagostino opened a new pull request #9233:
URL: https://github.com/apache/kafka/pull/9233


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

2020-08-31 Thread GitBox


cadonna commented on pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#issuecomment-683729711


   Call for review: @guozhangwang @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

2020-08-31 Thread GitBox


cadonna opened a new pull request #9232:
URL: https://github.com/apache/kafka/pull/9232


   This commit adds the remaining property-based RocksDB metrics as described 
in KIP-607, except for num-entries-active-mem-table, which was added in PR 
#9177.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9231: KAFKA-10447: Migrate tools module to JUnit 5 and mockito

2020-08-31 Thread GitBox


ijuma commented on a change in pull request #9231:
URL: https://github.com/apache/kafka/pull/9231#discussion_r480023644



##
File path: 
tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
##
@@ -62,17 +62,18 @@
 import java.util.List;
 import java.util.Optional;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@Tag("integration")

Review comment:
   This test is much slower than the rest in `tools`, so I think it makes 
sense to mark it as `integration`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9191: KAFKA-10355: Throw error when source topic was deleted

2020-08-31 Thread GitBox


cadonna commented on pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#issuecomment-683673488


   This is the implementation for KIP-662.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-31 Thread GitBox


cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-683631452


   @vvcephei Thank you very much for taking care of the conflicts and merging 
the PR!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-31 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-683602732


   @junrao the result of ```benchmark_test.py``` is attached (see description)
   
   The main regression ({"records_per_sec": 3653635.3672, "mb_per_sec": 
348.4378} -> {"records_per_sec": 2992220.2274, "mb_per_sec": 285.3604}) happens 
in case ```test_id: 
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_consumer_throughput.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.security_protocol=SSL.compression_type=snappy```.
 
   
   I re-run the case 5 times and it seems the throughput of that case is not 
stable.
   
   **BEFORE**
   
   1. {"records_per_sec": 3653635.3672, "mb_per_sec": 348.4378} 
   1. {"records_per_sec": 3812428.517, "mb_per_sec": 363.5815}
   1. {"records_per_sec": 3012048.1928, "mb_per_sec": 287.2513}
   1. {"records_per_sec": 3182686.1871, "mb_per_sec": 303.5246}
   1. {"records_per_sec": 2997601.9185, "mb_per_sec": 285.8736}
   
   **AFTER**
   
   1. {"records_per_sec": 2992220.2274, "mb_per_sec": 285.3604}
   1. {"records_per_sec": 3698224.8521, "mb_per_sec": 352.6902}
   1. {"records_per_sec": 2977076.5109, "mb_per_sec": 283.9161}
   1. {"records_per_sec": 3676470.5882, "mb_per_sec": 350.6156}
   1. {"records_per_sec": 3681885.1252, "mb_per_sec": 351.1319}
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-08-31 Thread Jerry Wei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187480#comment-17187480
 ] 

Jerry Wei commented on KAFKA-10134:
---

[~guozhang] I've tested against PR #8834, it works fine for our scenario. 
appreciate.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer3.log.2020-08-20.log, 
> consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)