[GitHub] [kafka] showuon commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2021-12-16 Thread GitBox


showuon commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r771167832



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -45,10 +46,21 @@ public RemoveMembersFromConsumerGroupOptions() {
 this.members = Collections.emptySet();
 }
 
+/**
+ * Sets an optional reason.
+ */
+public void reason(final String reason) {
+this.reason = reason;

Review comment:
   OK, if it's following the KIP,  then I'm fine with it. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers

2021-12-16 Thread Shivakumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17461241#comment-17461241
 ] 

Shivakumar commented on KAFKA-13077:


[~junrao]  could you please suggest what can we do or recommend any changes to 
fix this?

> Replication failing after unclean shutdown of ZK and all brokers
> 
>
> Key: KAFKA-13077
> URL: https://issues.apache.org/jira/browse/KAFKA-13077
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Christopher Auston
>Priority: Minor
>
> I am submitting this in the spirit of what can go wrong when an operator 
> violates the constraints Kafka depends on. I don't know if Kafka could or 
> should handle this more gracefully. I decided to file this issue because it 
> was easy to get the problem I'm reporting with Kubernetes StatefulSets (STS). 
> By "easy" I mean that I did not go out of my way to corrupt anything, I just 
> was not careful when restarting ZK and brokers.
> I violated the constraints of keeping Zookeeper stable and at least one 
> running in-sync replica. 
> I am running the bitnami/kafka helm chart on Amazon EKS.
> {quote}% kubectl get po kaf-kafka-0 -ojson |jq .spec.containers'[].image'
> "docker.io/bitnami/kafka:2.8.0-debian-10-r43"
> {quote}
> I started with 3 ZK instances and 3 brokers (both STS). I changed the 
> cpu/memory requests on both STS and kubernetes proceeded to restart ZK and 
> kafka instances at the same time. If I recall correctly there were some 
> crashes and several restarts but eventually all the instances were running 
> again. It's possible all ZK nodes and all brokers were unavailable at various 
> points.
> The problem I noticed was that two of the brokers were just continually 
> spitting out messages like:
> {quote}% kubectl logs kaf-kafka-0 --tail 10
> [2021-07-13 14:26:08,871] INFO [ProducerStateManager 
> partition=__transaction_state-0] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-0/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,871] WARN [Log partition=__transaction_state-0, 
> dir=/bitnami/kafka/data] *Non-monotonic update of high watermark from 
> (offset=2744 segment=[0:1048644]) to (offset=1 segment=[0:169])* 
> (kafka.log.Log)
> [2021-07-13 14:26:08,874] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Truncating to offset 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Loading producer state till offset 2 with message 
> format version 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [ProducerStateManager 
> partition=__transaction_state-10] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-10/0002.snapshot,2)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,877] WARN [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Non-monotonic update of high watermark from 
> (offset=2930 segment=[0:1048717]) to (offset=2 segment=[0:338]) 
> (kafka.log.Log)
> [2021-07-13 14:26:08,880] INFO [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Truncating to offset 1 (kafka.log.Log)
> [2021-07-13 14:26:08,882] INFO [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Loading producer state till offset 1 with message 
> format version 2 (kafka.log.Log)
> [2021-07-13 14:26:08,882] INFO [ProducerStateManager 
> partition=__transaction_state-20] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-20/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,883] WARN [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Non-monotonic update of high watermark from 
> (offset=2956 segment=[0:1048608]) to (offset=1 segment=[0:169]) 
> (kafka.log.Log)
> {quote}
> If I describe that topic I can see that several partitions have a leader of 2 
> and the ISR is just 2 (NOTE I added two more brokers and tried to reassign 
> the topic onto brokers 2,3,4 which you can see below). The new brokers also 
> spit out the messages about "non-monotonic update" just like the original 
> followers. This describe output is from the following day.
> {{% kafka-topics.sh ${=BS} -topic __transaction_state -describe}}
> {{Topic: __transaction_state TopicId: i7bBNCeuQMWl-ZMpzrnMAw PartitionCount: 
> 50 ReplicationFactor: 3 Configs: 
> compression.type=uncompressed,min.insync.replicas=3,cleanup.policy=compact,flush.ms=1000,segment.bytes=104857600,flush.messages=1,max.message.bytes=112,unclean.leader.election.enable=false,retention.bytes=1073741824}}
> {{ Topic: __transaction_state Partition: 0 Leader: 2 

[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers

2021-12-16 Thread Shivakumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17461239#comment-17461239
 ] 

Shivakumar commented on KAFKA-13077:


hi [~junrao]  we did not save the DumpLogSegment log during the incident
but we were able to reproduce the error and got this output, it should be the 
same in the case of our above error
kafka [ /var/lib/kafka/data/__consumer_offsets-46 ]$ ls -l
total 32
-rw-rw-r-- 1 kafka kafka0 Dec  9 11:40 .index
-rw-rw-r-- 1 kafka kafka  877 Dec  9 11:40 .log
-rw-rw-r-- 1 kafka kafka   12 Dec  9 11:40 .timeindex
-rw-rw-r-- 1 kafka kafka 10485760 Dec 13 13:20 04804743.index
-rw-rw-r-- 1 kafka kafka  207 Dec 11 21:01 04804743.log
-rw-rw-r-- 1 kafka kafka   10 Dec 11 21:01 04804743.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec 13 13:20 04804743.timeindex
-rw-rw-r-- 1 kafka kafka   10 Dec 13 13:11 04804745.snapshot
-rw-r--r-- 1 kafka kafka  132 Dec 13 13:20 leader-epoch-checkpoint
-rw-rw-r-- 1 kafka kafka   43 Dec  1 11:13 partition.metadata
kafka [ /var/lib/kafka/data/__consumer_offsets-46 ]$ kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 04804743.index
Dumping 04804743.index
offset: 4804743 position: 0
Mismatches in 
:/var/lib/kafka/data/__consumer_offsets-46/04804743.index
  Index offset: 4804743, log offset: 4804744
kafka [ /var/lib/kafka/data/__consumer_offsets-46 ]$ kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 04804743.log
Dumping 04804743.log
Starting offset: 4804743
baseOffset: 4804743 lastOffset: 4804744 count: 2 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 368 isTransactional: 
false isControl: false position: 0 CreateTime: 1639256468568 size: 207 magic: 2 
compresscodec: NONE crc: 2267717758 isvalid: true
kafka [ /var/lib/kafka/data/__consumer_offsets-46 ]$ kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 04804743.timeindex
Dumping 04804743.timeindex
timestamp: 1639256468568 offset: 4804744
kafka [ /var/lib/kafka/data/__consumer_offsets-46 ]$ kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 04804745.snapshot
Dumping 04804745.snapshot
kafka [ /var/lib/kafka/data/__consumer_offsets-46 ]$

> Replication failing after unclean shutdown of ZK and all brokers
> 
>
> Key: KAFKA-13077
> URL: https://issues.apache.org/jira/browse/KAFKA-13077
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Christopher Auston
>Priority: Minor
>
> I am submitting this in the spirit of what can go wrong when an operator 
> violates the constraints Kafka depends on. I don't know if Kafka could or 
> should handle this more gracefully. I decided to file this issue because it 
> was easy to get the problem I'm reporting with Kubernetes StatefulSets (STS). 
> By "easy" I mean that I did not go out of my way to corrupt anything, I just 
> was not careful when restarting ZK and brokers.
> I violated the constraints of keeping Zookeeper stable and at least one 
> running in-sync replica. 
> I am running the bitnami/kafka helm chart on Amazon EKS.
> {quote}% kubectl get po kaf-kafka-0 -ojson |jq .spec.containers'[].image'
> "docker.io/bitnami/kafka:2.8.0-debian-10-r43"
> {quote}
> I started with 3 ZK instances and 3 brokers (both STS). I changed the 
> cpu/memory requests on both STS and kubernetes proceeded to restart ZK and 
> kafka instances at the same time. If I recall correctly there were some 
> crashes and several restarts but eventually all the instances were running 
> again. It's possible all ZK nodes and all brokers were unavailable at various 
> points.
> The problem I noticed was that two of the brokers were just continually 
> spitting out messages like:
> {quote}% kubectl logs kaf-kafka-0 --tail 10
> [2021-07-13 14:26:08,871] INFO [ProducerStateManager 
> partition=__transaction_state-0] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-0/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,871] WARN [Log partition=__transaction_state-0, 
> dir=/bitnami/kafka/data] *Non-monotonic update of high watermark from 
> (offset=2744 segment=[0:1048644]) to (offset=1 segment=[0:169])* 
> (kafka.log.Log)
> [2021-07-13 14:26:08,874] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Truncating to offset 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Loading producer state till offset 2 with message 
> format version 2 (kafka.log.Log)

[GitHub] [kafka] ableegoldman opened a new pull request #11609: KAFKA-12648: fixes for query APIs and blocking calls

2021-12-16 Thread GitBox


ableegoldman opened a new pull request #11609:
URL: https://github.com/apache/kafka/pull/11609


   Fixes some issues with the NamedTopology version of the IQ methods that 
accept a `topologyName` argument, and adds tests for all. Also patches a few 
things regarding the blocking add/removeNamedTopology such as checking the 
waiting futures when removing a thread and unregistering the thread before the 
other cleanup methods in case any of them hang -- this should prevent a hanging 
StreamThread from making the Future wait longer than it should be


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11604: MINOR: retry when deleting offsets for named topologies

2021-12-16 Thread GitBox


ableegoldman merged pull request #11604:
URL: https://github.com/apache/kafka/pull/11604


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11535: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams

2021-12-16 Thread GitBox


mjsax commented on a change in pull request #11535:
URL: https://github.com/apache/kafka/pull/11535#discussion_r771046685



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -1097,15 +1097,20 @@ long decodeTimestamp(final String encryptedString) {
 if (encryptedString.isEmpty()) {
 return RecordQueue.UNKNOWN;
 }
-final ByteBuffer buffer = 
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
-final byte version = buffer.get();
-switch (version) {
-case LATEST_MAGIC_BYTE:
-return buffer.getLong();
-default:
-log.warn("Unsupported offset metadata version found. Supported 
version {}. Found version {}.",
- LATEST_MAGIC_BYTE, version);
-return RecordQueue.UNKNOWN;
+try {
+final ByteBuffer buffer = 
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
+final byte version = buffer.get();
+switch (version) {
+case LATEST_MAGIC_BYTE:
+return buffer.getLong();
+default:
+log.warn("Unsupported offset metadata version found. 
Supported version {}. Found version {}.",
+LATEST_MAGIC_BYTE, version);
+return RecordQueue.UNKNOWN;
+}
+} catch (final IllegalArgumentException argumentException) {
+log.warn("Unsupported offset metadata found {}", encryptedString);

Review comment:
   Nit: `log.warn("Could not decode offset metadata.")`
   
   I think it's better not to log `encryptedString` as we don't know what's in 
it, and we could potentially leak sensitive information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13476) Streams crashes when non Base64 Offset Metadata is found

2021-12-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17461160#comment-17461160
 ] 

Matthias J. Sax commented on KAFKA-13476:
-

{quote}Tracing data was added to metadata because previous releases of the 
Kafka Client and Streams did not use the metadata part of the OffsetAndMetadata 
structure.
{quote}
Thanks for the details. I'll follow up on the PR.

> Streams crashes when non Base64 Offset Metadata is found
> 
>
> Key: KAFKA-13476
> URL: https://issues.apache.org/jira/browse/KAFKA-13476
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Richard Bosch
>Assignee: Richard Bosch
>Priority: Minor
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> Kafka Streams applications use the metadata stored with the committed offsets 
> from previous running instances to extract timestamps.
> But when the metadata field contains other data the Base64 decoder will throw 
> an exception causing the Streams application to fail.
> A new Offset commit is then required to stop this failure.
> I've included the part of the log when we started a Kafka Streams app after 
> setting the offsets using a third party tool. This tool adds some tracing 
> metadata so developers and operators could debug who performed this custom 
> offset commit.
>  
> {noformat}
> 2021-11-16 12:56:36.020  INFO 25 --- [-StreamThread-2] 
> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=example-app-3, 
> groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns 
> and assigned partitions
>   at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837)
>  ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818)
>  ~[kafka-streams-2.7.0.jar:na]
> 2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] 
> org.apache.kafka.streams.KafkaStreams: stream-client 
> [streams-example-app-1] All stream threads have died. The instance will be in 
> error state and should be closed.
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
> {noformat}
> I recommend adding a Try Catch block around the Base64 decode in the 
> StreamTask.decodeTimestamp method and return the Unknown value when this 
> occurs.
> This is pure for resilience when bad data is encountered.
> After the Streams application performs a new offset commit the error should 
> not occur again, limiting the change of frequently occurring warnings in the 
> logs
> I've already made the changes and added a test for this issue, as I would 
> like to contribute to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13552) Unable to dynamically change broker log levels on KRaft

2021-12-16 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17461137#comment-17461137
 ] 

dengziming commented on KAFKA-13552:


hello [~rndgstn] , It seems this is a duplicated of KAFKA-13502.

> Unable to dynamically change broker log levels on KRaft
> ---
>
> Key: KAFKA-13552
> URL: https://issues.apache.org/jira/browse/KAFKA-13552
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ron Dagostino
>Priority: Major
>
> It is currently not possible to dynamically change the log level in KRaft.  
> For example:
> kafka-configs.sh --bootstrap-server  --alter --add-config 
> "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers 
> --entity-name 0
> Results in:
> org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
> type BROKER_LOGGER.
> The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). 
>  This needs to be moved out of there, and the functionality has to be 
> processed locally on the broker instead of being forwarded to the KRaft 
> controller.
> It is also an open question as to how we can dynamically alter log levels for 
> a remote KRaft controller.  Connecting directly to it is one possible 
> solution, but that may not be desirable since generally connecting directly 
> to the controller is not necessary.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jeffkbkim commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2021-12-16 Thread GitBox


jeffkbkim commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r770997666



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -45,10 +46,21 @@ public RemoveMembersFromConsumerGroupOptions() {
 this.members = Collections.emptySet();
 }
 
+/**
+ * Sets an optional reason.
+ */
+public void reason(final String reason) {
+this.reason = reason;

Review comment:
   yeah, i was on the fence for this. reverting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #11605: MINOR: replace lastOption call in LocalLog#flush() to prevent NoSuchElementException

2021-12-16 Thread GitBox


kowshik commented on pull request #11605:
URL: https://github.com/apache/kafka/pull/11605#issuecomment-996266315


   cc @junrao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13552) Unable to dynamically change broker log levels on KRaft

2021-12-16 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-13552:
-

 Summary: Unable to dynamically change broker log levels on KRaft
 Key: KAFKA-13552
 URL: https://issues.apache.org/jira/browse/KAFKA-13552
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.0.0, 3.1.0
Reporter: Ron Dagostino


It is currently not possible to dynamically change the log level in KRaft.  For 
example:

kafka-configs.sh --bootstrap-server  --alter --add-config 
"kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers --entity-name 0

Results in:

org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
type BROKER_LOGGER.

The code to process this request is in ZkAdminManager.alterLogLevelConfigs().  
This needs to be moved out of there, and the functionality has to be processed 
locally on the broker instead of being forwarded to the KRaft controller.

It is also an open question as to how we can dynamically alter log levels for a 
remote KRaft controller.  Connecting directly to it is one possible solution, 
but that may not be desirable since generally connecting directly to the 
controller is not necessary.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13534) Upgrade Log4j to 2.15.0 - CVE-2021-44228

2021-12-16 Thread Brandon Kimbrough (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17461105#comment-17461105
 ] 

Brandon Kimbrough commented on KAFKA-13534:
---

I would suggest that this ticket be amended to upgrade to log4j 2.16.0 to 
address 
[CVE-2021-45046|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-45046] 
as well as 
[CVE-2021-44228|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-44228].

> Upgrade Log4j to 2.15.0 - CVE-2021-44228
> 
>
> Key: KAFKA-13534
> URL: https://issues.apache.org/jira/browse/KAFKA-13534
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 2.7.0, 2.8.0, 3.0.0
>Reporter: Sai Kiran Vudutala
>Priority: Major
>
> Log4j has an RCE vulnerability, see 
> [https://www.lunasec.io/docs/blog/log4j-zero-day/]
> References. 
> [https://github.com/advisories/GHSA-jfh8-c2jp-5v3q]
> [https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] C0urante opened a new pull request #11608: KAFKA-13533: Clean up resources on failed task startup

2021-12-16 Thread GitBox


C0urante opened a new pull request #11608:
URL: https://github.com/apache/kafka/pull/11608


   [Jira](https://issues.apache.org/jira/browse/KAFKA-13533)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11604: MINOR: retry when deleting offsets for named topologies

2021-12-16 Thread GitBox


ableegoldman commented on a change in pull request #11604:
URL: https://github.com/apache/kafka/pull/11604#discussion_r770929975



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -221,9 +222,14 @@ public RemoveNamedTopologyResult removeNamedTopology(final 
String topologyToRemo
 .getMessage()
 .equals("Deleting offsets of a topic is 
forbidden while the consumer group is actively subscribed to it.")) {
 ex.printStackTrace();
+} else if (ex.getCause() != null &&
+ex.getCause() instanceof 
GroupIdNotFoundException) {
+log.debug("The offsets have been reset and it 
retied again, no longer retrying.");

Review comment:
   ```suggestion
   log.debug("The offsets have been reset by 
another client or the group has been deleted, no need to retry further.");
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r770867250



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -45,10 +46,21 @@ public RemoveMembersFromConsumerGroupOptions() {
 this.members = Collections.emptySet();
 }
 
+/**
+ * Sets an optional reason.
+ */
+public void reason(final String reason) {
+this.reason = reason;

Review comment:
   We have to follow the KIP on the naming here. We usually don't use "set" 
here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13551) kafka-log4j-appender-2.1.1.jar Is cve-2021-44228 involved?

2021-12-16 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13551.
-
Resolution: Information Provided

The impact is described in [https://kafka.apache.org/cve-list]

>  kafka-log4j-appender-2.1.1.jar Is cve-2021-44228 involved? 
> 
>
> Key: KAFKA-13551
> URL: https://issues.apache.org/jira/browse/KAFKA-13551
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: xiansheng fu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13544) Deadlock during shutting down kafka broker because of connectivity problem with zookeeper

2021-12-16 Thread joecqupt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460946#comment-17460946
 ] 

joecqupt commented on KAFKA-13544:
--

i make a pull request to fix it. 

[#11607|https://github.com/apache/kafka/pull/11607]

 

it thank should remove operation: `thread.join()`  

ChangeNotificationProcessorThread extends ShutdownableThread, so when 
{{thread.shutdown()}} return it means that this thread has shutdownCompleted, 
it is unnecessary to call {{thread.join()}}

> Deadlock during shutting down kafka broker because of connectivity problem 
> with zookeeper 
> --
>
> Key: KAFKA-13544
> URL: https://issues.apache.org/jira/browse/KAFKA-13544
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.1
>Reporter: Andrei Lakhmanets
>Priority: Major
> Attachments: kafka_broker_logs.log, kafka_broker_stackdump.txt
>
>
> Hi team,
> *Kafka version:* 2.8.1
> *Configuration:* 3 kafka brokers in different availability zones and 3 
> zookeeper brokers in different availability zones.
> I faced with deadlock in kafka. I've attached stack dump of the kafka state 
> to this ticket. The locked threads are "feature-zk-node-event-process-thread" 
> and "kafka-shutdown-hook".
> *Context:*
> My kafka cluster had connectivity problems with zookeeper and in the logs I 
> saw the next exception:
> The stacktrace:
> {code:java}
> [2021-12-06 18:31:14,629] WARN Unable to reconnect to ZooKeeper service, 
> session 0x1039563000f has expired (org.apache.zookeeper.ClientCnxn)
> [2021-12-06 18:31:14,629] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1039563000f has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-06 18:31:14,629] INFO EventThread shut down for session: 
> 0x1039563000f (org.apache.zookeeper.ClientCnxn)
> [2021-12-06 18:31:14,631] INFO [ZooKeeperClient Kafka server] Session 
> expired. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-06 18:31:14,632] ERROR [feature-zk-node-event-process-thread]: 
> Failed to process feature ZK node change event. The broker will eventually 
> exit. 
> (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
> kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either 
> before or while waiting for connection
>     at 
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:279)
>     at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$1(ZooKeeperClient.scala:261)
>     at 
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:261)
>     at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1797)
>     at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1767)
>     at 
> kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1762)
>     at kafka.zk.KafkaZkClient.getDataAndStat(KafkaZkClient.scala:771)
>     at kafka.zk.KafkaZkClient.getDataAndVersion(KafkaZkClient.scala:755)
>     at 
> kafka.server.FinalizedFeatureChangeListener$FeatureCacheUpdater.updateLatestOrThrow(FinalizedFeatureChangeListener.scala:74)
>     at 
> kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) {code}
> The exception is thrown in feature-zk-node-event-process-thread thread and it 
> is catched in method 
> FinalizedFeatureChangeListener.ChangeNotificationProcessorThread.doWork and 
> then doWork method throws FatalExitError(1).
> The FatalExitError catched in ShutdownableThread.run method and call 
> Exit.exit(e.statusCode()) which calls System.exit under the hood.
> The stackdump of "feature-zk-node-event-process-thread" thread:
> {code:java}
> "feature-zk-node-event-process-thread" #23 prio=5 os_prio=0 cpu=163.19ms 
> elapsed=1563046.32s tid=0x7fd0dcdec800 nid=0x2088 in Object.wait()  
> [0x7fd07e2c1000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(java.base@11.0.11/Native Method)
>     - waiting on 
>     at java.lang.Thread.join(java.base@11.0.11/Thread.java:1300)
>     - waiting to re-lock in wait() <0x88b9d3c8> (a 
> org.apache.kafka.common.utils.KafkaThread)
>     at java.lang.Thread.join(java.base@11.0.11/Thread.java:1375)
>     at 
> java.lang.ApplicationShutdownHooks.runHooks(java.base@11.0.11/ApplicationShutdownHooks.java:107)
>     at 
> java.lang.ApplicationShutdownHooks$1.run(java.base@11.0.11/ApplicationShutdownHooks.java:46)
>     at java.lang.Shutdown.runHooks(java.base@11.0.11/Shutdown.java:130)
>     at java.lang.Shutdown.exit(java.base@11.0.11/Shutdown.java:174)
>     - locked 

[GitHub] [kafka] JoeCqupt commented on pull request #11607: KAFKA-13544: fix FinalizedFeatureChangeListener deadlock

2021-12-16 Thread GitBox


JoeCqupt commented on pull request #11607:
URL: https://github.com/apache/kafka/pull/11607#issuecomment-996063587


   call for review @junrao @showuon @dajac @guozhangwang @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoeCqupt edited a comment on pull request #11607: KAFKA-13544: fix FinalizedFeatureChangeListener deadlock

2021-12-16 Thread GitBox


JoeCqupt edited a comment on pull request #11607:
URL: https://github.com/apache/kafka/pull/11607#issuecomment-996061715


   ChangeNotificationProcessorThread  extends ShutdownableThread, so  when ` 
thread.shutdown()` return it means that this thread has shutdownCompleted, it 
is unnecessary  to call `thread.join()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoeCqupt commented on pull request #11607: KAFKA-13544: fix FinalizedFeatureChangeListener deadlock

2021-12-16 Thread GitBox


JoeCqupt commented on pull request #11607:
URL: https://github.com/apache/kafka/pull/11607#issuecomment-996061715


   ChangeNotificationProcessorThread  extends ShutdownableThread, so  when ` 
thread.shutdown()` retrun it means that this thread has shutdownCompleted, it 
is unnecessary  to call `thread.join()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoeCqupt opened a new pull request #11607: KAFKA-13544: fix FinalizedFeatureChangeListener deadlock

2021-12-16 Thread GitBox


JoeCqupt opened a new pull request #11607:
URL: https://github.com/apache/kafka/pull/11607


   KAFKA-13544

   because shutdown hook case deadlock. 
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11599: KAFKA-13527: Add top-level error code field to DescribeLogDirsResponse

2021-12-16 Thread GitBox


mimaison commented on a change in pull request #11599:
URL: https://github.com/apache/kafka/pull/11599#discussion_r770769605



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2681,8 +2681,11 @@ public void handleResponse(AbstractResponse 
abstractResponse) {
 if (descriptions.size() > 0) {
 future.complete(descriptions);
 } else {
-// descriptions will be empty if and only if the user 
is not authorized to describe cluster resource.
-
future.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
+// Up to v3 DescribeLogDirsResponse did not have an 
error code field, hence it defaults to None
+Errors error = response.data().errorCode() == 
Errors.NONE.code()

Review comment:
   Absolutely, I've updated the tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11599: KAFKA-13527: Add top-level error code field to DescribeLogDirsResponse

2021-12-16 Thread GitBox


mimaison commented on a change in pull request #11599:
URL: https://github.com/apache/kafka/pull/11599#discussion_r770769143



##
File path: clients/src/main/resources/common/message/DescribeLogDirsRequest.json
##
@@ -19,7 +19,7 @@
   "listeners": ["zkBroker", "broker"],
   "name": "DescribeLogDirsRequest",
   // Version 1 is the same as version 0.
-  "validVersions": "0-2",
+  "validVersions": "0-3",

Review comment:
   Yes, good idea. Done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13492) IQ Parity: queries for key/value store range and scan

2021-12-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13492.
--
Resolution: Fixed

> IQ Parity: queries for key/value store range and scan
> -
>
> Key: KAFKA-13492
> URL: https://issues.apache.org/jira/browse/KAFKA-13492
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei merged pull request #11598: KAFKA-13479: Implement range and scan queries

2021-12-16 Thread GitBox


vvcephei merged pull request #11598:
URL: https://github.com/apache/kafka/pull/11598


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11598: feat: Implement range and scan queries

2021-12-16 Thread GitBox


vvcephei commented on pull request #11598:
URL: https://github.com/apache/kafka/pull/11598#issuecomment-996010360


   Failures were unrelated:
   ```
   
   
   Build / ARM / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true | 1 sec 
| 1
   -- | -- | --
   Build  / JDK 17 and Scala 2.13 /  
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
 | 51 sec | 1
   Build  / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1]  
Type=Raft, Name=testPreferredReplicaElection, Security=PLAINTEXT | 1 min 17 sec 
| 1
   Build / JDK 11 and Scala 2.13 / 
kafka.security.authorizer.AclAuthorizerWithZkSaslTest.testAclUpdateWithSessionExpiration()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#discussion_r770737533



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -755,18 +755,17 @@ protected void onJoinPrepare(int generation, String 
memberId) {
 
 @Override
 public void onLeavePrepare() {
-// Save the current Generation and use that to get the memberId, as 
the hb thread can change it at any time
+// Save the current Generation, as the hb thread can change it at any 
time
 final Generation currentGeneration = generation();
-final String memberId = currentGeneration.memberId;
 
-log.debug("Executing onLeavePrepare with generation {} and memberId 
{}", currentGeneration, memberId);
+log.debug("Executing onLeavePrepare with generation {}", 
currentGeneration);
 
 // we should reset assignment and trigger the callback before leaving 
group
 Set droppedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
 
 if (subscriptions.hasAutoAssignedPartitions() && 
!droppedPartitions.isEmpty()) {
 final Exception e;
-if (generation() == Generation.NO_GENERATION || 
rebalanceInProgress()) {
+if (currentGeneration.equals(Generation.NO_GENERATION) || 
rebalanceInProgress()) {

Review comment:
   I am not sure about this one. Is it correct to compare to 
`Generation.NO_GENERATION` here or do we need to compare to the `generationId`?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -705,13 +705,13 @@ protected void onJoinPrepare(int generation, String 
memberId) {
 // so that users can still access the previously owned partitions to 
commit offsets etc.
 Exception exception = null;
 final Set revokedPartitions;
-if (generation == Generation.NO_GENERATION.generationId &&
+if (generation == Generation.NO_GENERATION.generationId ||

Review comment:
   Is `|| memberId.equals(Generation.NO_GENERATION.memberId)` really 
necessary? My understanding is that a reset `memberId` implies that 
`generationId` was also reset. I guess that it does not hurt to have it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-16 Thread Lucas Bradstreet (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460852#comment-17460852
 ] 

Lucas Bradstreet commented on KAFKA-13388:
--

[~dajac] I'm not sure about which approach we should use, but I think we should 
ensure that whatever timeout is used is reasonable for delivery.timeout.ms e.g. 
with delivery.timeout.ms of 120s and request.timeout.ms of 30s you generally 
get 3+ shots at attempts before hitting the delivery timeout. At least unless 
you're stuck in this state.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Assignee: David Jacot
>Priority: Critical
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13488) Producer fails to recover if topic gets deleted (and gets auto-created)

2021-12-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13488:

Fix Version/s: (was: 3.2.0)

> Producer fails to recover if topic gets deleted (and gets auto-created)
> ---
>
> Key: KAFKA-13488
> URL: https://issues.apache.org/jira/browse/KAFKA-13488
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 2.8.1
>Reporter: Prateek Agarwal
>Assignee: Prateek Agarwal
>Priority: Blocker
> Fix For: 3.1.0, 3.0.1, 2.8.2
>
> Attachments: KAFKA-13488.patch
>
>
> Producer currently fails to produce messages to a topic if the topic is 
> deleted and gets auto-created OR is created manually during the lifetime of 
> the producer (and certain other conditions are met - leaderEpochs of deleted 
> topic > 0).
>  
> To reproduce, these are the steps which can be carried out:
> 0) A cluster with 2 brokers 0 and 1 with auto.topic.create=true.
> 1) Create a topic T with 2 partitions P0-> (0,1), P1-> (0,1)
> 2) Reassign the partitions such that P0-> (1,0), P1-> (1,0).
> 2) Create a producer P and send few messages which land on all the TPs of 
> topic T.
> 3) Delete the topic T
> 4) Immediately, send a new message from producer P, this message will be 
> failed to send and eventually timed out.
> A test-case which fails with the above steps is added at the end as well as a 
> patch file.
>  
> This happens after leaderEpoch (KIP-320) was introduced in the 
> MetadataResponse KAFKA-7738. There is a solution attempted to fix this issue 
> in KAFKA-12257, but the solution has a bug due to which the above use-case 
> still fails.
>  
> *Issue in the solution of KAFKA-12257:*
> {code:java}
> // org.apache.kafka.clients.Metadata.handleMetadataResponse():
>...
>         Map topicIds = new HashMap<>();
>         Map oldTopicIds = cache.topicIds();
>         for (MetadataResponse.TopicMetadata metadata : 
> metadataResponse.topicMetadata()) {
>             String topicName = metadata.topic();
>             Uuid topicId = metadata.topicId();
>             topics.add(topicName);
>             // We can only reason about topic ID changes when both IDs are 
> valid, so keep oldId null unless the new metadata contains a topic ID
>             Uuid oldTopicId = null;
>             if (!Uuid.ZERO_UUID.equals(topicId)) {
>                 topicIds.put(topicName, topicId);
>                 oldTopicId = oldTopicIds.get(topicName);
>             } else {
>                  topicId = null;
>             }
> ...
> } {code}
> With every new call to {{{}handleMetadataResponse{}}}(), {{cache.topicIds()}} 
> gets created afresh. When a topic is deleted and created immediately soon 
> afterwards (because of auto.create being true), producer's call to 
> {{MetadataRequest}} for the deleted topic T will result in a 
> {{UNKNOWN_TOPIC_OR_PARTITION}} or {{LEADER_NOT_AVAILABLE}} error 
> {{MetadataResponse}} depending on which point of topic recreation metadata is 
> being asked at. In the case of errors, TopicId returned back in the response 
> is {{{}Uuid.ZERO_UUID{}}}. As seen in the above logic, if the topicId 
> received is ZERO, the method removes the earlier topicId entry from the cache.
> Now, when a non-Error Metadata Response does come back for the newly created 
> topic T, it will have a non-ZERO topicId now but the leaderEpoch for the 
> partitions will mostly be ZERO. This situation will lead to rejection of the 
> new MetadataResponse if the older LeaderEpoch was >0 (for more details, refer 
> to KAFKA-12257). Because of the rejection of the metadata, producer will 
> never get to know the new Leader of the TPs of the newly created topic.
>  
> {{*}} 1. Solution / Fix (Preferred){*}:
> Client's metadata should keep on remembering the old topicId till:
> 1) response for the TP has ERRORs
> 2) topicId entry was already present in the cache earlier
> 3) retain time is not expired
> {code:java}
> --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> @@ -336,6 +336,10 @@ public class Metadata implements Closeable {
>  topicIds.put(topicName, topicId);
>  oldTopicId = oldTopicIds.get(topicName);
>  } else {
> +// Retain the old topicId for comparison with newer TopicId 
> created later. This is only needed till retainMs
> +if (metadata.error() != Errors.NONE && 
> oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs))
> +topicIds.put(topicName, oldTopicIds.get(topicName));
> +else
>  topicId = null;
>  }

[GitHub] [kafka] cadonna commented on pull request #11587: HOTFIX: Bump version of grgit fro 4.1.0 to 4.1.1

2021-12-16 Thread GitBox


cadonna commented on pull request #11587:
URL: https://github.com/apache/kafka/pull/11587#issuecomment-995915599


   Thanks @dajac !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11587: HOTFIX: Bump version of grgit fro 4.1.0 to 4.1.1

2021-12-16 Thread GitBox


cadonna merged pull request #11587:
URL: https://github.com/apache/kafka/pull/11587


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13488) Producer fails to recover if topic gets deleted (and gets auto-created)

2021-12-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13488:

Fix Version/s: 3.0.1
   2.8.2
   3.2.0

> Producer fails to recover if topic gets deleted (and gets auto-created)
> ---
>
> Key: KAFKA-13488
> URL: https://issues.apache.org/jira/browse/KAFKA-13488
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 2.8.1
>Reporter: Prateek Agarwal
>Assignee: Prateek Agarwal
>Priority: Blocker
> Fix For: 3.1.0, 3.0.1, 2.8.2, 3.2.0
>
> Attachments: KAFKA-13488.patch
>
>
> Producer currently fails to produce messages to a topic if the topic is 
> deleted and gets auto-created OR is created manually during the lifetime of 
> the producer (and certain other conditions are met - leaderEpochs of deleted 
> topic > 0).
>  
> To reproduce, these are the steps which can be carried out:
> 0) A cluster with 2 brokers 0 and 1 with auto.topic.create=true.
> 1) Create a topic T with 2 partitions P0-> (0,1), P1-> (0,1)
> 2) Reassign the partitions such that P0-> (1,0), P1-> (1,0).
> 2) Create a producer P and send few messages which land on all the TPs of 
> topic T.
> 3) Delete the topic T
> 4) Immediately, send a new message from producer P, this message will be 
> failed to send and eventually timed out.
> A test-case which fails with the above steps is added at the end as well as a 
> patch file.
>  
> This happens after leaderEpoch (KIP-320) was introduced in the 
> MetadataResponse KAFKA-7738. There is a solution attempted to fix this issue 
> in KAFKA-12257, but the solution has a bug due to which the above use-case 
> still fails.
>  
> *Issue in the solution of KAFKA-12257:*
> {code:java}
> // org.apache.kafka.clients.Metadata.handleMetadataResponse():
>...
>         Map topicIds = new HashMap<>();
>         Map oldTopicIds = cache.topicIds();
>         for (MetadataResponse.TopicMetadata metadata : 
> metadataResponse.topicMetadata()) {
>             String topicName = metadata.topic();
>             Uuid topicId = metadata.topicId();
>             topics.add(topicName);
>             // We can only reason about topic ID changes when both IDs are 
> valid, so keep oldId null unless the new metadata contains a topic ID
>             Uuid oldTopicId = null;
>             if (!Uuid.ZERO_UUID.equals(topicId)) {
>                 topicIds.put(topicName, topicId);
>                 oldTopicId = oldTopicIds.get(topicName);
>             } else {
>                  topicId = null;
>             }
> ...
> } {code}
> With every new call to {{{}handleMetadataResponse{}}}(), {{cache.topicIds()}} 
> gets created afresh. When a topic is deleted and created immediately soon 
> afterwards (because of auto.create being true), producer's call to 
> {{MetadataRequest}} for the deleted topic T will result in a 
> {{UNKNOWN_TOPIC_OR_PARTITION}} or {{LEADER_NOT_AVAILABLE}} error 
> {{MetadataResponse}} depending on which point of topic recreation metadata is 
> being asked at. In the case of errors, TopicId returned back in the response 
> is {{{}Uuid.ZERO_UUID{}}}. As seen in the above logic, if the topicId 
> received is ZERO, the method removes the earlier topicId entry from the cache.
> Now, when a non-Error Metadata Response does come back for the newly created 
> topic T, it will have a non-ZERO topicId now but the leaderEpoch for the 
> partitions will mostly be ZERO. This situation will lead to rejection of the 
> new MetadataResponse if the older LeaderEpoch was >0 (for more details, refer 
> to KAFKA-12257). Because of the rejection of the metadata, producer will 
> never get to know the new Leader of the TPs of the newly created topic.
>  
> {{*}} 1. Solution / Fix (Preferred){*}:
> Client's metadata should keep on remembering the old topicId till:
> 1) response for the TP has ERRORs
> 2) topicId entry was already present in the cache earlier
> 3) retain time is not expired
> {code:java}
> --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> @@ -336,6 +336,10 @@ public class Metadata implements Closeable {
>  topicIds.put(topicName, topicId);
>  oldTopicId = oldTopicIds.get(topicName);
>  } else {
> +// Retain the old topicId for comparison with newer TopicId 
> created later. This is only needed till retainMs
> +if (metadata.error() != Errors.NONE && 
> oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs))
> +topicIds.put(topicName, oldTopicIds.get(topicName));
> +else
>  

[GitHub] [kafka] dajac commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


dajac commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-995910977


   Merged to trunk, 3.1, 3.0 and 2.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13488) Producer fails to recover if topic gets deleted (and gets auto-created)

2021-12-16 Thread Prateek Agarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prateek Agarwal resolved KAFKA-13488.
-
  Reviewer: David Jacot
Resolution: Fixed

Fixed in https://github.com/apache/kafka/pull/11552

> Producer fails to recover if topic gets deleted (and gets auto-created)
> ---
>
> Key: KAFKA-13488
> URL: https://issues.apache.org/jira/browse/KAFKA-13488
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 2.8.1
>Reporter: Prateek Agarwal
>Assignee: Prateek Agarwal
>Priority: Blocker
> Fix For: 3.1.0
>
> Attachments: KAFKA-13488.patch
>
>
> Producer currently fails to produce messages to a topic if the topic is 
> deleted and gets auto-created OR is created manually during the lifetime of 
> the producer (and certain other conditions are met - leaderEpochs of deleted 
> topic > 0).
>  
> To reproduce, these are the steps which can be carried out:
> 0) A cluster with 2 brokers 0 and 1 with auto.topic.create=true.
> 1) Create a topic T with 2 partitions P0-> (0,1), P1-> (0,1)
> 2) Reassign the partitions such that P0-> (1,0), P1-> (1,0).
> 2) Create a producer P and send few messages which land on all the TPs of 
> topic T.
> 3) Delete the topic T
> 4) Immediately, send a new message from producer P, this message will be 
> failed to send and eventually timed out.
> A test-case which fails with the above steps is added at the end as well as a 
> patch file.
>  
> This happens after leaderEpoch (KIP-320) was introduced in the 
> MetadataResponse KAFKA-7738. There is a solution attempted to fix this issue 
> in KAFKA-12257, but the solution has a bug due to which the above use-case 
> still fails.
>  
> *Issue in the solution of KAFKA-12257:*
> {code:java}
> // org.apache.kafka.clients.Metadata.handleMetadataResponse():
>...
>         Map topicIds = new HashMap<>();
>         Map oldTopicIds = cache.topicIds();
>         for (MetadataResponse.TopicMetadata metadata : 
> metadataResponse.topicMetadata()) {
>             String topicName = metadata.topic();
>             Uuid topicId = metadata.topicId();
>             topics.add(topicName);
>             // We can only reason about topic ID changes when both IDs are 
> valid, so keep oldId null unless the new metadata contains a topic ID
>             Uuid oldTopicId = null;
>             if (!Uuid.ZERO_UUID.equals(topicId)) {
>                 topicIds.put(topicName, topicId);
>                 oldTopicId = oldTopicIds.get(topicName);
>             } else {
>                  topicId = null;
>             }
> ...
> } {code}
> With every new call to {{{}handleMetadataResponse{}}}(), {{cache.topicIds()}} 
> gets created afresh. When a topic is deleted and created immediately soon 
> afterwards (because of auto.create being true), producer's call to 
> {{MetadataRequest}} for the deleted topic T will result in a 
> {{UNKNOWN_TOPIC_OR_PARTITION}} or {{LEADER_NOT_AVAILABLE}} error 
> {{MetadataResponse}} depending on which point of topic recreation metadata is 
> being asked at. In the case of errors, TopicId returned back in the response 
> is {{{}Uuid.ZERO_UUID{}}}. As seen in the above logic, if the topicId 
> received is ZERO, the method removes the earlier topicId entry from the cache.
> Now, when a non-Error Metadata Response does come back for the newly created 
> topic T, it will have a non-ZERO topicId now but the leaderEpoch for the 
> partitions will mostly be ZERO. This situation will lead to rejection of the 
> new MetadataResponse if the older LeaderEpoch was >0 (for more details, refer 
> to KAFKA-12257). Because of the rejection of the metadata, producer will 
> never get to know the new Leader of the TPs of the newly created topic.
>  
> {{*}} 1. Solution / Fix (Preferred){*}:
> Client's metadata should keep on remembering the old topicId till:
> 1) response for the TP has ERRORs
> 2) topicId entry was already present in the cache earlier
> 3) retain time is not expired
> {code:java}
> --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> @@ -336,6 +336,10 @@ public class Metadata implements Closeable {
>  topicIds.put(topicName, topicId);
>  oldTopicId = oldTopicIds.get(topicName);
>  } else {
> +// Retain the old topicId for comparison with newer TopicId 
> created later. This is only needed till retainMs
> +if (metadata.error() != Errors.NONE && 
> oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs))
> +topicIds.put(topicName, oldTopicIds.get(topicName));
> +   

[GitHub] [kafka] dajac merged pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


dajac merged pull request #11552:
URL: https://github.com/apache/kafka/pull/11552


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13547) Kafka - 1.0.0 | Remove log4j.jar

2021-12-16 Thread masood (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460780#comment-17460780
 ] 

masood commented on KAFKA-13547:


[~dongjin] 
Thanks for the response. 
Would like to know a few more information about that change. 
do we have any ETA for that? 
will it be available in the next release? 
are you going to make the log4j logging as optional? 

> Kafka - 1.0.0 | Remove log4j.jar
> 
>
> Key: KAFKA-13547
> URL: https://issues.apache.org/jira/browse/KAFKA-13547
> Project: Kafka
>  Issue Type: Bug
>Reporter: masood
>Priority: Blocker
>
> We wanted to remove the log4j.jar but ended up with a dependency on the 
> kafka.producer.ProducerConfig.
> Caused by: java.lang.NoClassDefFoundError: org/apache/log4j/Logger
>     at kafka.utils.Logging.logger(Logging.scala:24)
>     at kafka.utils.Logging.logger$(Logging.scala:24)
>     at 
> kafka.utils.VerifiableProperties.logger$lzycompute(VerifiableProperties.scala:27)
>     at kafka.utils.VerifiableProperties.logger(VerifiableProperties.scala:27)
>     at kafka.utils.Logging.info(Logging.scala:71)
>     at kafka.utils.Logging.info$(Logging.scala:70)
>     at kafka.utils.VerifiableProperties.info(VerifiableProperties.scala:27)
>     at kafka.utils.VerifiableProperties.verify(VerifiableProperties.scala:218)
>     at kafka.producer.ProducerConfig.(ProducerConfig.scala:61)
> Is there any configuration available which can resolve this error.
> Please note we are not using log4j.properties or any other log4j logging 
> mechanism for Kafka connection in the application.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13551) kafka-log4j-appender-2.1.1.jar Is cve-2021-44228 involved?

2021-12-16 Thread xiansheng fu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiansheng fu updated KAFKA-13551:
-
Issue Type: Task  (was: Bug)

>  kafka-log4j-appender-2.1.1.jar Is cve-2021-44228 involved? 
> 
>
> Key: KAFKA-13551
> URL: https://issues.apache.org/jira/browse/KAFKA-13551
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: xiansheng fu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2021-12-16 Thread GitBox


rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770558946



##
File path: 
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##
@@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
*/
   def generateConfigs: Seq[KafkaConfig]
 
+  /**
+   * It is sometimes useful to keep the same log.dirs configuration value; 
override this method if that is desired
+   *
+   * @param priorConfigs the prior configs
+   * @return the new generated configs
+   */
+  def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = 
generateConfigs
+

Review comment:
   > a simpler way of doing this where subclasses who want the old configs 
just do super.generateConfigs and then modify what that function returns
   
   The `generateConfigs()` method is abstract, so `super.generateConfigs` isn't 
there.  The real issue here is that the class generally doesn't specify it's 
log directory -- that's typically auto-generated.  So if we generate configs a 
second time we get a new log directory, and typically we don't want a new one 
-- we want to keep the old one.  So we need the prior generated configs in 
order to know that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11564: MINOR: improve consoleProducer option description

2021-12-16 Thread GitBox


showuon commented on pull request #11564:
URL: https://github.com/apache/kafka/pull/11564#issuecomment-995788843


   @mimaison , improve the console producer option descriptions. I think it's 
good to merge this PR before you started your KIP-810. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13551) kafka-log4j-appender-2.1.1.jar Is cve-2021-44228 involved?

2021-12-16 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460651#comment-17460651
 ] 

Dongjin Lee commented on KAFKA-13551:
-

In short, NO. CVE-2021-44228 is problematic only when you are using JMS 
appender.

disclaimer: I am currently working on log4j2 migration, KAFKA-9366 and 
KAFKA-12399.

>  kafka-log4j-appender-2.1.1.jar Is cve-2021-44228 involved? 
> 
>
> Key: KAFKA-13551
> URL: https://issues.apache.org/jira/browse/KAFKA-13551
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: xiansheng fu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on pull request #11560: KAFKA-7589: Allow configuring network threads per listener

2021-12-16 Thread GitBox


mimaison commented on pull request #11560:
URL: https://github.com/apache/kafka/pull/11560#issuecomment-995703478


   Thanks for the review @tombentley! I've pushed an update addressing your 
findings.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11560: KAFKA-7589: Allow configuring network threads per listener

2021-12-16 Thread GitBox


mimaison commented on a change in pull request #11560:
URL: https://github.com/apache/kafka/pull/11560#discussion_r770452647



##
File path: 
core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
##
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.network
+
+import kafka.server.{BaseRequestTest, Defaults, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
+import org.apache.kafka.common.network.ListenerName
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+
+import java.util.Properties
+import scala.jdk.CollectionConverters._
+
+class DynamicNumNetworkThreadsTest extends BaseRequestTest {
+
+  override def brokerCount = 1
+
+  val internal = "PLAINTEXT"
+  val external = "EXTERNAL"
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(KafkaConfig.ListenersProp, s"$internal://localhost:0, 
$external://localhost:0")
+properties.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$internal:PLAINTEXT, $external:PLAINTEXT")
+
properties.put(s"listener.name.${internal.toLowerCase}.${KafkaConfig.NumNetworkThreadsProp}",
 "2")
+  }
+
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+super.setUp(testInfo)
+TestUtils.createTopic(zkClient, "test", brokerCount, brokerCount, servers)
+assertEquals(2, getNumNetworkThreads(internal))
+assertEquals(Defaults.NumNetworkThreads, getNumNetworkThreads(external))
+  }
+
+  def getNumNetworkThreads(listener: String): Int = {
+brokers.head.metrics.metrics().keySet().asScala
+  .filter(_.name() == "request-rate")
+  .count(listener == _.tags().get("listener"))
+  }
+
+  @Test
+  def testDynamicNumNetworkThreads(): Unit = {

Review comment:
   The control place always has a single processor thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11560: KAFKA-7589: Allow configuring network threads per listener

2021-12-16 Thread GitBox


mimaison commented on a change in pull request #11560:
URL: https://github.com/apache/kafka/pull/11560#discussion_r770451653



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -95,20 +95,22 @@ class SocketServer(val config: KafkaConfig,
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
DataPlaneAcceptor]()
+  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
   // control-plane
-  private var controlPlaneProcessorOpt : Option[Processor] = None
-  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
+  private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = 
None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = 
config.controlPlaneListenerName.map(_ =>
-new RequestChannel(20, ControlPlaneMetricPrefix, time, 
apiVersionManager.newRequestMetrics))
+new RequestChannel(20, ControlPlaneAcceptor.MetricPrefix, time, 
apiVersionManager.newRequestMetrics))
 
-  private var nextProcessorId = 0
+  private val nextPId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
   private var startedProcessingRequests = false
   private var stoppedProcessingRequests = false
 
+  def nextProcessorId(): Int = {
+nextPId.getAndIncrement()

Review comment:
   I've renamed the field to `nextProcessorId`, this is not an issue as 
it's private




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11560: KAFKA-7589: Allow configuring network threads per listener

2021-12-16 Thread GitBox


mimaison commented on a change in pull request #11560:
URL: https://github.com/apache/kafka/pull/11560#discussion_r770444358



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -920,7 +919,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
   def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
Map[String, AnyRef] = {
 kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case 
(key, _) =>
   // skip the reconfigurable configs
-  !DynamicSecurityConfigs.contains(key) && 
!SocketServer.ListenerReconfigurableConfigs.contains(key)
+  !DynamicSecurityConfigs.contains(key) && 
!SocketServer.ListenerReconfigurableConfigs.contains(key) && 
!DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)

Review comment:
   Configuring the number of processor threads is unique to the data plane. 
The control place always has a single processor to guarantee controller 
requests are handled in order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12980) Allow consumers to return from poll when position advances due to aborted transactions

2021-12-16 Thread Francisco Camenforte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460555#comment-17460555
 ] 

Francisco Camenforte commented on KAFKA-12980:
--

[~ChrisEgerton] are there plans to backport it to 2.8.x/3.0.x? Thanks!

> Allow consumers to return from poll when position advances due to aborted 
> transactions
> --
>
> Key: KAFKA-12980
> URL: https://issues.apache.org/jira/browse/KAFKA-12980
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.2.0
>
>
> When {{Consumer::poll}} is invoked on a topic with an open transaction, and 
> then that transaction is aborted, {{poll}} does not return unless there are 
> other records available in that topic after the aborted transaction.
> Instead, {{poll}} could return in this case, even when no records are 
> available.
> This facilitates reads to the end of a topic where the end offsets of a topic 
> are listed and then a consumer for that topic is polled until its 
> [position|https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#position(org.apache.kafka.common.TopicPartition)]
>  is at or beyond each of those offsets (for example, [Connect does 
> this|https://github.com/apache/kafka/blob/fce771579c3e20f20949c4c7e0a5e3a16c57c7f0/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L322-L345]
>  when reading to the end of any of its internal topics).
> We could update the existing language in the docs for {{Consumer::poll}} from
> {quote}This method returns immediately if there are records available.
> {quote}
> to
> {quote}This method returns immediately if there are records available or if 
> the position advances past control records.
> {quote}
>  
> A workaround for existing users who would like to see this is to use short 
> poll intervals and manually check the consumer's position in between each 
> poll, but this is fairly tedious and may lead to excess CPU and network 
> utilization depending on the latency requirements for knowing when the end of 
> the topic has been reached.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on a change in pull request #11599: KAFKA-13527: Add top-level error code field to DescribeLogDirsResponse

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11599:
URL: https://github.com/apache/kafka/pull/11599#discussion_r770373244



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2681,8 +2681,11 @@ public void handleResponse(AbstractResponse 
abstractResponse) {
 if (descriptions.size() > 0) {
 future.complete(descriptions);
 } else {
-// descriptions will be empty if and only if the user 
is not authorized to describe cluster resource.
-
future.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
+// Up to v3 DescribeLogDirsResponse did not have an 
error code field, hence it defaults to None
+Errors error = response.data().errorCode() == 
Errors.NONE.code()

Review comment:
   Should we extend unit tests in `KafkaAdminTest` to cover this change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11599: KAFKA-13527: Add top-level error code field to DescribeLogDirsResponse

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11599:
URL: https://github.com/apache/kafka/pull/11599#discussion_r770373619



##
File path: clients/src/main/resources/common/message/DescribeLogDirsRequest.json
##
@@ -19,7 +19,7 @@
   "listeners": ["zkBroker", "broker"],
   "name": "DescribeLogDirsRequest",
   // Version 1 is the same as version 0.
-  "validVersions": "0-2",
+  "validVersions": "0-3",

Review comment:
   nit: Could we also put a comment for version 3?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


prat0318 commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-995596844


   > @prat0318 Thanks for the update. I left a few more nits to fix typos. 
Would you have time to quickly address them?
   
   @dajac Thanks again for the review. I have addressed the suggestions. Please 
re-review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


dajac commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-995594632


   @prat0318 Thanks for the update. I left a few more nits to fix typos. Would 
you have time to quickly address them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770363133



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,83 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
+val producerCount: Int = 1
+val brokerCount: Int = 2
+
+serverConfig.put(KafkaConfig.NumPartitionsProp, 2.toString)
+serverConfig.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+serverConfig.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+
+producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString)
+producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
1.toString)
+producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
1.toString)
+
+/**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in 
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+@Test
+def testSendWithTopicDeletionMidWay(): Unit = {
+val numRecords = 10
+val topic = "topic"
+
+// Create topic with leader as 0 for the 2 partitions.
+createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+val reassignment = Map(
+new TopicPartition(topic, 0) -> Seq(1, 0),
+new TopicPartition(topic, 1) -> Seq(1, 0)
+)
+
+// Change leader to 1 for both the partitions to increase leader Epoch 
from 0 -> 1

Review comment:
   nit: `Epoch` -> `epoch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770362464



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() {
 metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
 // Start with a topic with no topic ID
-metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
 metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-// We should treat an added topic ID as though it is the same topic. 
Handle only when epoch increases.
-// Don't update to an older one
-metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, 
topicIds);
+// If the Older topic Id is null, we should go with the new TopicId as 
the leader Epoch

Review comment:
   nit: `Older` -> `older`; `topic Id` -> `topic ID`; `TopicId` -> `topic 
ID`; `Epoch` -> `epoch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770362464



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() {
 metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
 // Start with a topic with no topic ID
-metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
 metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-// We should treat an added topic ID as though it is the same topic. 
Handle only when epoch increases.
-// Don't update to an older one
-metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, 
topicIds);
+// If the Older topic Id is null, we should go with the new TopicId as 
the leader Epoch

Review comment:
   nit: `Older` -> `older`; `topic Id` -> `topic ID`; `TopicId` -> `topic 
ID`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r77033



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,30 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Topic topic-1 is now deleted so Response contains an Error. 
LeaderEpoch should still maintain Old value
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), 
Collections.emptyMap());
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Create topic-1 again but this time with a topic ID bar. LeaderEpoch 
should be updated to new even if lower.

Review comment:
   nit: `Create topic-1 again but this time with a topic ID.`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13551) kafka-log4j-appender-2.1.1.jar Is cve-2021-44228 involved?

2021-12-16 Thread xiansheng fu (Jira)
xiansheng fu created KAFKA-13551:


 Summary:  kafka-log4j-appender-2.1.1.jar Is cve-2021-44228 
involved? 
 Key: KAFKA-13551
 URL: https://issues.apache.org/jira/browse/KAFKA-13551
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: xiansheng fu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-16 Thread GitBox


dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770355018



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,30 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());

Review comment:
   nit: We could have kept the comment before the block to be consistent 
with the other two blocks. I just wanted to rework it a bit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org