Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]

2023-10-19 Thread via GitHub


nizhikov commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1772180903

   CI looks OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15658) Zookeeper 3.6.3 jar | CVE-2023-44981

2023-10-19 Thread masood (Jira)
masood created KAFKA-15658:
--

 Summary: Zookeeper 3.6.3 jar | CVE-2023-44981 
 Key: KAFKA-15658
 URL: https://issues.apache.org/jira/browse/KAFKA-15658
 Project: Kafka
  Issue Type: Bug
Reporter: masood


The [CVE-2023-44981|https://www.mend.io/vulnerability-database/CVE-2023-44981]  
vulnerability has been reported in the zookeeper.jar. 

It's worth noting that the latest version of Kafka has a dependency on version 
3.8.2 of Zookeeper, which is also impacted by this vulnerability. 

[https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.2|https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.2.]

could you please verify its impact on the Kafka.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15591) Trogdor produce workload reports errors in KRaft mode

2023-10-19 Thread Xi Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1578#comment-1578
 ] 

Xi Yang edited comment on KAFKA-15591 at 10/20/23 6:01 AM:
---

I print out the topic description after creating the topic. It looks like the 
partitions are correctly elected before Trogdor starts producing messages. 
However, the producer still reports the NOT_LEADER_OR_FOLLOWER error.

Topic desc:(name=foo1, internal=false, partitions=(partition=0, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=1, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=2, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=3, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=4, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=5, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=6, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=7, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=8, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=9, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)), authorizedOperations=null)
Create topics:[foo1-9, foo1-8, foo1-7, foo1-6, foo1-5, foo1-4, foo1-3, foo1-2, 
foo1-1, foo1-0]
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 4 on topic-partition foo1-7, retrying (2147483646 
attempts left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition foo1-7 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 4 on topic-partition foo1-6, retrying (2147483646 
attempts left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition foo1-6 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 4 on topic-partition foo1-5, retrying (2147483646 
attempts left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition foo1-5 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 4 on topic-partition foo1-4, retrying (2147483646 
attempts left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition foo1-4 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broke

[jira] [Commented] (KAFKA-15591) Trogdor produce workload reports errors in KRaft mode

2023-10-19 Thread Xi Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1578#comment-1578
 ] 

Xi Yang commented on KAFKA-15591:
-

I print out the topic description after creating the topic. It looks like the 
partitions are correctly elected before Trogdor starts producing messages. 
However, the producer still reports the NOT_LEADER_OR_FOLLOWER error.

```Topic desc:(name=foo1, internal=false, partitions=(partition=0, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=1, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=2, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=3, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=4, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=5, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=6, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=7, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=8, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)),(partition=9, 
leader=localhost:9092 (id: 1 rack: null), replicas=localhost:9092 (id: 1 rack: 
null), isr=localhost:9092 (id: 1 rack: null)), authorizedOperations=null)
Create topics:[foo1-9, foo1-8, foo1-7, foo1-6, foo1-5, foo1-4, foo1-3, foo1-2, 
foo1-1, foo1-0]
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 4 on topic-partition foo1-7, retrying (2147483646 
attempts left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition foo1-7 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 4 on topic-partition foo1-6, retrying (2147483646 
attempts left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition foo1-6 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 4 on topic-partition foo1-5, retrying (2147483646 
attempts left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition foo1-5 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 4 on topic-partition foo1-4, retrying (2147483646 
attempts left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender)
[2023-10-20 05:43:42,843] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition foo1-4 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests inten

Re: [PR] KAFKA-15632: Drop the invalid remote log metadata events [kafka]

2023-10-19 Thread via GitHub


kamalcph commented on code in PR #14576:
URL: https://github.com/apache/kafka/pull/14576#discussion_r1364317572


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java:
##
@@ -302,22 +307,26 @@ public void 
addCopyInProgressSegment(RemoteLogSegmentMetadata remoteLogSegmentMe
 
 RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
 RemoteLogSegmentMetadata existingMetadata = 
idToSegmentMetadata.get(remoteLogSegmentId);
-checkStateTransition(existingMetadata != null ? 
existingMetadata.state() : null,
-remoteLogSegmentMetadata.state());
-
+boolean isValid = checkStateTransition(existingMetadata != null ? 
existingMetadata.state() : null,
+remoteLogSegmentMetadata.state(), 
remoteLogSegmentMetadata.remoteLogSegmentId());
+if (!isValid) {
+return;
+}
 for (Integer epoch : 
remoteLogSegmentMetadata.segmentLeaderEpochs().keySet()) {
 leaderEpochEntries.computeIfAbsent(epoch, leaderEpoch -> new 
RemoteLogLeaderEpochState())
 
.handleSegmentWithCopySegmentStartedState(remoteLogSegmentId);
 }
-
 idToSegmentMetadata.put(remoteLogSegmentId, remoteLogSegmentMetadata);
 }
 
-private void checkStateTransition(RemoteLogSegmentState existingState, 
RemoteLogSegmentState targetState) {
-if (!RemoteLogSegmentState.isValidTransition(existingState, 
targetState)) {
-throw new IllegalStateException(
-"Current state: " + existingState + " can not be 
transitioned to target state: " + targetState);
+private boolean checkStateTransition(RemoteLogSegmentState existingState,
+ RemoteLogSegmentState targetState,
+ RemoteLogSegmentId segmentId) {
+boolean isValid = 
RemoteLogSegmentState.isValidTransition(existingState, targetState);
+if (!isValid) {
+log.error("Current state: {} can not be transitioned to target 
state: {}, segmentId: {}. Dropping the event",

Review Comment:
   Logging the error instead of throwing the exception as it will stop the 
internal consumer which consumes from the remote log metadata topic.
   
   To clarify, producer `enable.idempotence` is set to true by default from 
v3.2. In our internal cluster, producer idempotence was not enabled and we have 
seen the out-of-order messages in the internal topic. Once this issue happens, 
the internal consumer stops processing the message, then fails to upload the 
pending segments to remote storage. This issue is not recoverable even after 
broker restarts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6

2023-10-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1567#comment-1567
 ] 

Ismael Juma commented on KAFKA-15657:
-

I was wondering the same. We should fix KAFKA-15653 and see if it's the source 
of the issues you have been seeing. I am not aware of any other change that 
would result in that sort of problem.

> Unexpected errors when producing transactionally in 3.6
> ---
>
> Key: KAFKA-15657
> URL: https://issues.apache.org/jira/browse/KAFKA-15657
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
>Reporter: Travis Bischel
>Priority: Major
>
> In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD 
> (which I created a separate issue for), and INVALID_TXN_STATE and 
> UNKNOWN_SERVER_ERROR.
> INVALID_TXN_STATE is being returned even though the partitions have been 
> added to the transaction (AddPartitionsToTxn). Nothing about the code has 
> changed between 3.5 and 3.6, and I have loop-integration-tested this code 
> against 3.5 thousands of times. 3.6 is newly - and always - returning 
> INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I 
> eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the 
> broker logs, the broker indicates that sequence numbers are out of order - 
> but (a) I am repeating requests that were in order (so something on the 
> broker got a little haywire maybe? or maybe this is due to me ignoring 
> invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I 
> am receiving UNKNOWN_SERVER_ERROR.
> I think the main problem is the client unexpectedly receiving 
> INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to 
> USE on return for some reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6

2023-10-19 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1565#comment-1565
 ] 

Travis Bischel commented on KAFKA-15657:


I'm beginning to suspect that KAFKA-15653 may eventually lead to this, I never 
experience this bug without first experiencing the NPEs while appending. I'll 
wait until 15653 is addressed and loop-test seeing if this still occurs.

> Unexpected errors when producing transactionally in 3.6
> ---
>
> Key: KAFKA-15657
> URL: https://issues.apache.org/jira/browse/KAFKA-15657
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
>Reporter: Travis Bischel
>Priority: Major
>
> In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD 
> (which I created a separate issue for), and INVALID_TXN_STATE and 
> UNKNOWN_SERVER_ERROR.
> INVALID_TXN_STATE is being returned even though the partitions have been 
> added to the transaction (AddPartitionsToTxn). Nothing about the code has 
> changed between 3.5 and 3.6, and I have loop-integration-tested this code 
> against 3.5 thousands of times. 3.6 is newly - and always - returning 
> INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I 
> eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the 
> broker logs, the broker indicates that sequence numbers are out of order - 
> but (a) I am repeating requests that were in order (so something on the 
> broker got a little haywire maybe? or maybe this is due to me ignoring 
> invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I 
> am receiving UNKNOWN_SERVER_ERROR.
> I think the main problem is the client unexpectedly receiving 
> INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to 
> USE on return for some reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-10-19 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng updated KAFKA-15629:

Fix Version/s: 3.7.0

> proposal to introduce IQv2 Query Types: TimestampedKeyQuery and 
> TimestampedRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> KIP-992: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-19 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng updated KAFKA-15527:

Fix Version/s: 3.7.0

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> Add reverseRange and reverseAll query over kv-store in IQv2
> Update an implementation of the Query interface, introduced in [KIP-796: 
> Interactive Query 
> v2|https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2]
>  , to support reverseRange and reverseAll.
> Use bounded query to achieve reverseRange and use unbounded query to achieve 
> reverseAll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14808: fix leaderless partition issue when controller removes u… [kafka]

2023-10-19 Thread via GitHub


github-actions[bot] commented on PR #13451:
URL: https://github.com/apache/kafka/pull/13451#issuecomment-1772023180

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7699) Improve wall-clock time punctuations

2023-10-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1550#comment-1550
 ] 

Matthias J. Sax commented on KAFKA-7699:


Happy to support you. The KIP wiki page describes how it works: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 
– if you have any questions about it, happy to answer them.

> Improve wall-clock time punctuations
> 
>
> Key: KAFKA-7699
> URL: https://issues.apache.org/jira/browse/KAFKA-7699
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, wall-clock time punctuation allow to schedule periodic call backs 
> based on wall-clock time progress. The punctuation time starts, when the 
> punctuation is scheduled, thus, it's non-deterministic what is desired for 
> many use cases (I want a call-back in 5 minutes from "now").
> It would be a nice improvement, to allow users to "anchor" wall-clock 
> punctation, too, similar to a cron job: Thus, a punctuation would be 
> triggered at "fixed" times like the beginning of the next hour, independent 
> when the punctuation was registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1549#comment-1549
 ] 

Ismael Juma commented on KAFKA-15653:
-

cc [~divijvaidya] 

> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Priority: Major
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-19 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1542#comment-1542
 ] 

Travis Bischel edited comment on KAFKA-15653 at 10/20/23 2:55 AM:
--

{noformat}
[2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 
(kafka.server.ReplicaManager)
java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" 
because "this.intermediateBufRef" is null 
at 
org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
at 
org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
at 
org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
at 
org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
at 
org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:754)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:686)
at kafka.server.KafkaApis.handle(KafkaApis.scala:180)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:149)
at java.base/java.lang.Thread.run(Thread.java:833)
{noformat}


was (Author: twmb):
Not just :
{noformat}
[2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 
(kafka.server.ReplicaManager)
java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" 
because "this.intermediateBufRef" is null 
at 
org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
at 
org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
at 
org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
at 
org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
at 
org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLi

Re: [PR] KAFKA-15607:Possible NPE is thrown in MirrorCheckpointTask [kafka]

2023-10-19 Thread via GitHub


hudeqi commented on code in PR #14587:
URL: https://github.com/apache/kafka/pull/14587#discussion_r1366378637


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java:
##
@@ -169,6 +169,33 @@ public void testSyncOffset() {
 "Consumer 2 " + topic2 + " failed");
 }
 
+@Test
+public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() {
+Map> 
idleConsumerGroupsOffset = new HashMap<>();
+Map> 
checkpointsPerConsumerGroup = new HashMap<>();
+
+String consumer = "consumer";
+String topic = "topic";
+Map ct = new HashMap<>();
+TopicPartition tp = new TopicPartition(topic, 0);
+// Simulate other clients such as sarama to reset the group offset of 
the target cluster to -1. At this time,
+// the obtained `OffsetAndMetadata` of the target cluster is null.

Review Comment:
   committed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15607:Possible NPE is thrown in MirrorCheckpointTask [kafka]

2023-10-19 Thread via GitHub


hudeqi commented on PR #14587:
URL: https://github.com/apache/kafka/pull/14587#issuecomment-1771985878

   > Thanks @hudeqi, I think this is reasonable. Do you know why Sarama sets 
offsets to -1? If it's for normal operations and not indicative of something 
wrong, we may not even need to log a warning message in that case and could 
change the check 
[here](https://github.com/apache/kafka/blob/af747fbfed7e81617c3b3ad0e4dc8c857aa9502b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L325)
 from `!targetConsumerOffset.containsKey(topicPartition)` to 
`targetConsumerOffset.get(topicPartition) == null`.
   
   Thanks your review. @C0urante . In fact, directly resetting to -1 is an 
abnormal operation, whether for Sarama or other clients. This problem was 
discovered in this way: when using the Sarama client, we wanted to reset the 
group's offset to the latest, so we passed in the `OffsetNewest` in Sarama as a 
parameter to call the reset offset method. Finally, it was discovered that the 
offset was reset to -1. The reason is that the value of `OffsetNewest` is -1. 
For Sarama, resetting to the latest should be another operation process, but 
this kind of misoperation is not intercepted like the java client, which can be 
deal friendly. So this issue occurred when encountering scenarios like MM2. So 
I think it is better to add a warn log here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6

2023-10-19 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1537#comment-1537
 ] 

Travis Bischel edited comment on KAFKA-15657 at 10/20/23 2:35 AM:
--

re: first comment – the client doesn't advance to producing unless 
AddPartitionsToTxn succeeds. If the request partially succeeds, failed 
partitions are stripped and only successfully added partitions are produced. 
The logic is definitely hard to follow if you're not familiar with the code, 
but here's issuing/stripping: 
[here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L442-L498,]
 and here's where the request is issued (in the same function as producing – 
before the produce request is issued): 
[here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L316-L357]

Also wrt race condition – these tests also pass against the redpanda binary, 
which has always had KIP-890 semantics / has never allowed transactional 
produce requests unless the partition has been added to the transaction (in 
fact this is part of how I caught some early redpanda bugs with _that_ 
implementation).

 

re: second comment, I'll capture some debug logs so you can see both the client 
logs and the container. The tests currently are using v3. I'm currently running 
this in a loop:

```

docker compose down; sleep 1; docker compose up -d ; sleep 5 ; while go test 
-run Txn/cooperative > logs; do echo whoo; docker compose down; sleep 1; docker 
compose up -d. sleep 5; done

```

Once this fails, I'll upload the logs. This is currently ignoring 
INVALID_RECORD, which I more regularly run into. I may remove gating this to 
just the cooperative test and instead run it against all balancers at once (it 
seems heavier load runs into the problem more frequently).

 

Also this does remind me though, somebody had a feature request that 
deliberately abused the ability to produce before AddPartitionsToTxn was done, 
I need to remove support of this for 3.6+. This _is_ exercised in franz-go's CI 
right now and will fail CI for 3.6+ (see the doc comment on 
[EndBeginTxnUnsafe|https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#EndBeginTxnHow]).

Edit: KAFKA-15653 may be complicating the investigation here, too.


was (Author: twmb):
re: first comment – the client doesn't advance to producing unless 
AddPartitionsToTxn succeeds. If the request partially succeeds, failed 
partitions are stripped and only successfully added partitions are produced. 
The logic is definitely hard to follow if you're not familiar with the code, 
but here's issuing/stripping: 
[here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L442-L498,]
 and here's where the request is issued (in the same function as producing – 
before the produce request is issued): 
[here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L316-L357]

Also wrt race condition – these tests also pass against the redpanda binary, 
which has always had KIP-890 semantics / has never allowed transactional 
produce requests unless the partition has been added to the transaction (in 
fact this is part of how I caught some early redpanda bugs with _that_ 
implementation).

 

re: second comment, I'll capture some debug logs so you can see both the client 
logs and the container. The tests currently are using v3. I'm currently running 
this in a loop:

```

docker compose down; sleep 1; docker compose up -d ; sleep 5 ; while go test 
-run Txn/cooperative > logs; do echo whoo; docker compose down; sleep 1; docker 
compose up -d. sleep 5; done

```

Once this fails, I'll upload the logs. This is currently ignoring 
INVALID_RECORD, which I more regularly run into. I may remove gating this to 
just the cooperative test and instead run it against all balancers at once (it 
seems heavier load runs into the problem more frequently).

 

Also this does remind me though, somebody had a feature request that 
deliberately abused the ability to produce before AddPartitionsToTxn was done, 
I need to remove support of this for 3.6+. This _is_ exercised in franz-go's CI 
right now and will fail CI for 3.6+ (see the doc comment on 
[EndBeginTxnUnsafe|https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#EndBeginTxnHow]).

> Unexpected errors when producing transactionally in 3.6
> ---
>
> Key: KAFKA-15657
> URL: https://issues.apache.org/jira/browse/KAFKA-15657
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
>Reporter: Travis Bischel
>Priority: Major
>
> In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD 
> (which I created a se

[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-19 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1542#comment-1542
 ] 

Travis Bischel commented on KAFKA-15653:


Not just :
{noformat}
[2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 
(kafka.server.ReplicaManager)
java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" 
because "this.intermediateBufRef" is null 
at 
org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
at 
org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
at 
org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
at 
org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
at 
org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:754)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:686)
at kafka.server.KafkaApis.handle(KafkaApis.scala:180)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:149)
at java.base/java.lang.Thread.run(Thread.java:833)
{noformat}

> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Priority: Major
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> sc

[jira] [Updated] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-19 Thread Travis Bischel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Bischel updated KAFKA-15653:
---
Summary: NPE in ChunkedByteStream  (was: NPE in ChunkedByteStream.)

> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Priority: Major
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15346: add support for single-Key_single-timestamp IQs with versioned state stores (KIP-960) [kafka]

2023-10-19 Thread via GitHub


aliehsaeedii opened a new pull request, #14596:
URL: https://github.com/apache/kafka/pull/14596

   This PR implements KIP-960.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6

2023-10-19 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1537#comment-1537
 ] 

Travis Bischel commented on KAFKA-15657:


re: first comment – the client doesn't advance to producing unless 
AddPartitionsToTxn succeeds. If the request partially succeeds, failed 
partitions are stripped and only successfully added partitions are produced. 
The logic is definitely hard to follow if you're not familiar with the code, 
but here's issuing/stripping: 
[here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L442-L498,]
 and here's where the request is issued (in the same function as producing – 
before the produce request is issued): 
[here|https://github.com/twmb/franz-go/blob/ae169a1f35c2ee6b130c4e520632b33e6c491e0b/pkg/kgo/sink.go#L316-L357]

Also wrt race condition – these tests also pass against the redpanda binary, 
which has always had KIP-890 semantics / has never allowed transactional 
produce requests unless the partition has been added to the transaction (in 
fact this is part of how I caught some early redpanda bugs with _that_ 
implementation).

 

re: second comment, I'll capture some debug logs so you can see both the client 
logs and the container. The tests currently are using v3. I'm currently running 
this in a loop:

```

docker compose down; sleep 1; docker compose up -d ; sleep 5 ; while go test 
-run Txn/cooperative > logs; do echo whoo; docker compose down; sleep 1; docker 
compose up -d. sleep 5; done

```

Once this fails, I'll upload the logs. This is currently ignoring 
INVALID_RECORD, which I more regularly run into. I may remove gating this to 
just the cooperative test and instead run it against all balancers at once (it 
seems heavier load runs into the problem more frequently).

 

Also this does remind me though, somebody had a feature request that 
deliberately abused the ability to produce before AddPartitionsToTxn was done, 
I need to remove support of this for 3.6+. This _is_ exercised in franz-go's CI 
right now and will fail CI for 3.6+ (see the doc comment on 
[EndBeginTxnUnsafe|https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#EndBeginTxnHow]).

> Unexpected errors when producing transactionally in 3.6
> ---
>
> Key: KAFKA-15657
> URL: https://issues.apache.org/jira/browse/KAFKA-15657
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
>Reporter: Travis Bischel
>Priority: Major
>
> In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD 
> (which I created a separate issue for), and INVALID_TXN_STATE and 
> UNKNOWN_SERVER_ERROR.
> INVALID_TXN_STATE is being returned even though the partitions have been 
> added to the transaction (AddPartitionsToTxn). Nothing about the code has 
> changed between 3.5 and 3.6, and I have loop-integration-tested this code 
> against 3.5 thousands of times. 3.6 is newly - and always - returning 
> INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I 
> eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the 
> broker logs, the broker indicates that sequence numbers are out of order - 
> but (a) I am repeating requests that were in order (so something on the 
> broker got a little haywire maybe? or maybe this is due to me ignoring 
> invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I 
> am receiving UNKNOWN_SERVER_ERROR.
> I think the main problem is the client unexpectedly receiving 
> INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to 
> USE on return for some reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]

2023-10-19 Thread via GitHub


dengziming opened a new pull request, #14595:
URL: https://github.com/apache/kafka/pull/14595

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-19 Thread via GitHub


iit2009060 commented on PR #14483:
URL: https://github.com/apache/kafka/pull/14483#issuecomment-1771965660

   > @iit2009060 , do you have any comments to this PR?
   @showuon No ,  I am good . Thanks @jeel2420  for addressing the review 
comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Remove unused method parameter in ConsumerGroupCommand [kafka]

2023-10-19 Thread via GitHub


showuon merged PR #14585:
URL: https://github.com/apache/kafka/pull/14585


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Remove unused method parameter in ConsumerGroupCommand [kafka]

2023-10-19 Thread via GitHub


showuon commented on PR #14585:
URL: https://github.com/apache/kafka/pull/14585#issuecomment-1771961258

   Failed tests are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeWithPrefixedAcls(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore[cache=false,
 log=false, supplier=ROCKS_WINDOW, kind=DSL]
   Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testTimeouts()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-19 Thread via GitHub


showuon commented on PR #14483:
URL: https://github.com/apache/kafka/pull/14483#issuecomment-1771958677

   @iit2009060 , do you have any comments to this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode [kafka]

2023-10-19 Thread via GitHub


showuon merged PR #14573:
URL: https://github.com/apache/kafka/pull/14573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode [kafka]

2023-10-19 Thread via GitHub


showuon commented on PR #14573:
URL: https://github.com/apache/kafka/pull/14573#issuecomment-1771954831

   Ran 3 times of CI build and no fetchRequestTest failures. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6

2023-10-19 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526
 ] 

Justine Olshan commented on KAFKA-15657:


[~twmb] Can you confirm if the AddPartitionsToTxn calls are succeeding? And 
what version they are using? I am concerned the partitions might not be added 
correctly.

 

> Unexpected errors when producing transactionally in 3.6
> ---
>
> Key: KAFKA-15657
> URL: https://issues.apache.org/jira/browse/KAFKA-15657
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
>Reporter: Travis Bischel
>Priority: Major
>
> In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD 
> (which I created a separate issue for), and INVALID_TXN_STATE and 
> UNKNOWN_SERVER_ERROR.
> INVALID_TXN_STATE is being returned even though the partitions have been 
> added to the transaction (AddPartitionsToTxn). Nothing about the code has 
> changed between 3.5 and 3.6, and I have loop-integration-tested this code 
> against 3.5 thousands of times. 3.6 is newly - and always - returning 
> INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I 
> eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the 
> broker logs, the broker indicates that sequence numbers are out of order - 
> but (a) I am repeating requests that were in order (so something on the 
> broker got a little haywire maybe? or maybe this is due to me ignoring 
> invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I 
> am receiving UNKNOWN_SERVER_ERROR.
> I think the main problem is the client unexpectedly receiving 
> INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to 
> USE on return for some reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6

2023-10-19 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1524#comment-1524
 ] 

Justine Olshan commented on KAFKA-15657:


Hey Travis. INVALID_TXN_STATE likely indicates there was a race condition or a 
bug in the client. In this case, the transaction should abort. This is part of 
the work of KIP-890. 

I wonder if there is a bug in the client that caused hanging (or late messages 
getting through) before and it is just being caught now.

If you want to disable transaction verification, you can by setting 
transaction.partition.verification.enable to false in your server config files.

> Unexpected errors when producing transactionally in 3.6
> ---
>
> Key: KAFKA-15657
> URL: https://issues.apache.org/jira/browse/KAFKA-15657
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
>Reporter: Travis Bischel
>Priority: Major
>
> In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD 
> (which I created a separate issue for), and INVALID_TXN_STATE and 
> UNKNOWN_SERVER_ERROR.
> INVALID_TXN_STATE is being returned even though the partitions have been 
> added to the transaction (AddPartitionsToTxn). Nothing about the code has 
> changed between 3.5 and 3.6, and I have loop-integration-tested this code 
> against 3.5 thousands of times. 3.6 is newly - and always - returning 
> INVALID_TXN_STATE. If I change the code to retry on INVALID_TXN_STATE, I 
> eventually quickly (always) receive UNKNOWN_SERVER_ERROR. In looking at the 
> broker logs, the broker indicates that sequence numbers are out of order - 
> but (a) I am repeating requests that were in order (so something on the 
> broker got a little haywire maybe? or maybe this is due to me ignoring 
> invalid_txn_state?), _and_ I am not receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I 
> am receiving UNKNOWN_SERVER_ERROR.
> I think the main problem is the client unexpectedly receiving 
> INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to 
> USE on return for some reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15657) Unexpected errors when producing transactionally in 3.6

2023-10-19 Thread Travis Bischel (Jira)
Travis Bischel created KAFKA-15657:
--

 Summary: Unexpected errors when producing transactionally in 3.6
 Key: KAFKA-15657
 URL: https://issues.apache.org/jira/browse/KAFKA-15657
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 3.6.0
Reporter: Travis Bischel


In loop-testing the franz-go client, I am frequently receiving INVALID_RECORD 
(which I created a separate issue for), and INVALID_TXN_STATE and 
UNKNOWN_SERVER_ERROR.

INVALID_TXN_STATE is being returned even though the partitions have been added 
to the transaction (AddPartitionsToTxn). Nothing about the code has changed 
between 3.5 and 3.6, and I have loop-integration-tested this code against 3.5 
thousands of times. 3.6 is newly - and always - returning INVALID_TXN_STATE. If 
I change the code to retry on INVALID_TXN_STATE, I eventually quickly (always) 
receive UNKNOWN_SERVER_ERROR. In looking at the broker logs, the broker 
indicates that sequence numbers are out of order - but (a) I am repeating 
requests that were in order (so something on the broker got a little haywire 
maybe? or maybe this is due to me ignoring invalid_txn_state?), _and_ I am not 
receiving OUT_OF_ORDER_SEQUENCE_NUMBER, I am receiving UNKNOWN_SERVER_ERROR.

I think the main problem is the client unexpectedly receiving 
INVALID_TXN_STATE, but a second problem here is that OOOSN is being mapped to 
USE on return for some reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]

2023-10-19 Thread via GitHub


mumrah commented on code in PR #14545:
URL: https://github.com/apache/kafka/pull/14545#discussion_r1366294162


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -146,6 +147,15 @@ void handleTopicsSnapshot(TopicsImage topicsImage, 
KRaftMigrationOperationConsum
 Map> changedPartitions = new 
HashMap<>();
 Map> newPartitions = new 
HashMap<>();
 
+Set pendingTopicDeletions = 
migrationClient.topicClient().readPendingTopicDeletions();
+if (!pendingTopicDeletions.isEmpty()) {
+operationConsumer.accept(
+DELETE_PENDING_TOPIC_DELETION,
+"Delete pending topic deletions",
+migrationState -> 
migrationClient.topicClient().clearPendingTopicDeletions(pendingTopicDeletions, 
migrationState)

Review Comment:
   Yea, this is in `handleTopicsSnapshot` which is sync'ing the TopicImage to 
ZK. Really it doesn't need to happen each time when we handle a snapshot, but I 
figured putting it here was better than having additional one-off logic at 
migration time 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14264) Refactor coordinator code

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14264:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Refactor coordinator code
> -
>
> Key: KAFKA-14264
> URL: https://issues.apache.org/jira/browse/KAFKA-14264
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.4.0
>
>
> To refactor the consumer, we changed how the coordinator is called.  However, 
> there will be a time period where the old and new implementation need to 
> coexist, so we will need to override some of the methods and create a new 
> implementation of the coordinator.  In particular:
>  # ensureCoordinatorReady needs to be non-blocking or we could just use the 
> sendFindCoordinatorRequest.
>  # joinGroupIfNeeded needs to be broken up into more find grain stages for 
> the new implementation to work.
> We also need to create the coordinator state machine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14468) Refactor Commit Logic

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14468:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Refactor Commit Logic
> -
>
> Key: KAFKA-14468
> URL: https://issues.apache.org/jira/browse/KAFKA-14468
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.5.0
>
>
> Refactor commit logic using the new multi-threaded coordinator construct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]

2023-10-19 Thread via GitHub


mumrah commented on code in PR #14545:
URL: https://github.com/apache/kafka/pull/14545#discussion_r1366292982


##
core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala:
##
@@ -47,8 +47,14 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) 
extends TopicMigrationClie
 if (!interests.contains(TopicVisitorInterest.TOPICS)) {
   throw new IllegalArgumentException("Must specify at least TOPICS in 
topic visitor interests.")
 }
-val topics = zkClient.getAllTopicsInCluster()
-val replicaAssignmentAndTopicIds = 
zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+val allTopics = zkClient.getAllTopicsInCluster()
+val topicDeletions = readPendingTopicDeletions().asScala
+val topicsToMigrated = allTopics -- topicDeletions
+if (topicDeletions.nonEmpty) {
+  warn(s"Found ${topicDeletions.size} pending topic deletions: 
$topicDeletions. These will be not migrated " +

Review Comment:
   Yea, i wondered about that. What about logging each deletion separately at 
TRACE ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Investigate ConsumerNetworkThreadTest's 
> testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14247) Implement EventHandler interface and DefaultEventHandler for Consumer

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14247:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Implement EventHandler interface and DefaultEventHandler for Consumer
> -
>
> Key: KAFKA-14247
> URL: https://issues.apache.org/jira/browse/KAFKA-14247
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The background thread runs inside of the DefaultEventHandler to consume 
> events from the ApplicationEventQueue and produce events to the 
> BackgroundEventQueue.
> The background thread runnable consist of a loop that tries to poll events 
> from the ApplicationQueue, processes the event if there are any, and poll 
> networkClient.
> In this implementation, the DefaultEventHandler spawns a thread that runs the 
> BackgroundThreadRunnable.  The runnable, as of the current PR, does the 
> following things:
>  # Initialize the networkClient
>  # Poll ApplicationEvent from the queue if there's any
>  # process the event
>  # poll the networkClient
> PR: https://github.com/apache/kafka/pull/12672



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15270:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Integration tests for AsyncConsumer simple consume case
> ---
>
> Key: KAFKA-15270
> URL: https://issues.apache.org/jira/browse/KAFKA-15270
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-preview
>
> This task involves writing integration tests for covering the simple consume 
> functionality of the AsyncConsumer. This should include validation of the 
> assign, fetch and positions logic.
> Not covering any committed offset functionality as part of this task. 
> Integration tests should have a similar form as the existing 
> PlaintextConsumerTest, but scoped to the simple consume flow. 
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15306:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Integrate committed offsets logic when updating fetching positions
> --
>
> Key: KAFKA-15306
> URL: https://issues.apache.org/jira/browse/KAFKA-15306
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Integrate refreshCommittedOffsets logic, currently performed by the 
> coordinator, into the update fetch positions performed on every iteration of 
> the async consumer poll loop. This should rely on the CommitRequestManager to 
> perform the request based on the refactored model, but it should reuse the 
> logic for processing the committed offsets and updating the positions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15115:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Implement resetPositions functionality in OffsetsRequestManager
> ---
>
> Key: KAFKA-15115
> URL: https://issues.apache.org/jira/browse/KAFKA-15115
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Introduce support for resetting positions in the new OffsetsRequestManager. 
> This task will include a new event for the resetPositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The reset positions implementation will keep the same behaviour as the one in 
> the old consumer, but adapted to the new threading model. So it is based in a 
> RESET_POSITIONS events that is submitted to the background thread, and then 
> processed by the ApplicationEventProcessor. The processing itself is done by 
> the OffsetRequestManager given that this will require a LIST_OFFSETS request 
> for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14950) Implement assign() and assignment()

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14950:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Implement assign() and assignment()
> ---
>
> Key: KAFKA-14950
> URL: https://issues.apache.org/jira/browse/KAFKA-14950
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.6.0
>
>
> Implement assign() and assignment()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15163:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Implement validatePositions functionality for new KafkaConsumer
> ---
>
> Key: KAFKA-15163
> URL: https://issues.apache.org/jira/browse/KAFKA-15163
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Introduce support for validating positions in the new OffsetsRequestManager. 
> This task will include a new event for the validatePositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The validate positions implementation will keep the same behaviour as the one 
> in the old consumer, but adapted to the new threading model. So it is based 
> in a VALIDATE_POSITIONS events that is submitted to the background thread, 
> and the processed by the ApplicationEventProcessor. The processing itself is 
> done by the OffsetRequestManager given that this will require an 
> OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such 
> requests in the OffsetRequestManager, responsible for offset-related requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15164) Extract reusable logic from OffsetsForLeaderEpochClient

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15164:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Extract reusable logic from OffsetsForLeaderEpochClient
> ---
>
> Key: KAFKA-15164
> URL: https://issues.apache.org/jira/browse/KAFKA-15164
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The OffsetsForLeaderEpochClient class is used for making asynchronous 
> requests to the OffsetsForLeaderEpoch API. It encapsulates the logic for:
>  * preparing the requests
>  * sending them over the network using the network client
>  * handling the response
> The new KafkaConsumer implementation, based on a new threading model, 
> requires the same logic for preparing the requests and handling the 
> responses, with different behaviour for how the request is actually sent.
> This task includes refactoring OffsetsForLeaderEpochClient by extracting out 
> the logic for preparing the requests and handling the responses. No changes 
> in the existing logic, just making the functionality available to be reused.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15174) Ensure the correct thread is executing the callbacks

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15174:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Ensure the correct thread is executing the callbacks
> 
>
> Key: KAFKA-15174
> URL: https://issues.apache.org/jira/browse/KAFKA-15174
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> We need to add assertion tests to ensure the correct thread is executing the 
> offset commit callbacks and rebalance callback



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14196:
--
Labels: new-consumer-threading-should-fix  (was: 
consumer-threading-refactor new-consumer-threading-should-fix)

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.3
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD
>  
> https://github.com/apache/kafka/pull/12603



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14252) Create background thread skeleton for new Consumer threading model

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14252:
--
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Create background thread skeleton for new Consumer threading model
> --
>
> Key: KAFKA-14252
> URL: https://issues.apache.org/jira/browse/KAFKA-14252
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The event handler internally instantiates a background thread to consume 
> ApplicationEvents and produce BackgroundEvents.  In this ticket, we will 
> create a skeleton of the background thread.  We will incrementally add 
> implementation in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15656) Frequent INVALID_RECORD on Kafka 3.6

2023-10-19 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1515#comment-1515
 ] 

Travis Bischel commented on KAFKA-15656:


Note that if I change my code to retry on INVALID_RECORD – and repeat the same 
exact serialization – the produce request will succeed when repeated.

> Frequent INVALID_RECORD on Kafka 3.6
> 
>
> Key: KAFKA-15656
> URL: https://issues.apache.org/jira/browse/KAFKA-15656
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
>Reporter: Travis Bischel
>Priority: Major
> Attachments: invalid_record.log
>
>
> Using this docker-compose.yml:
> {noformat}
> version: "3.7"
> services:
>   kafka:
>     image: bitnami/kafka:latest
>     network_mode: host
>     environment:
>       KAFKA_ENABLE_KRAFT: yes
>       KAFKA_CFG_PROCESS_ROLES: controller,broker
>       KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
>       KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
>       KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
> CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
>       KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093
>       # Set this to "PLAINTEXT://127.0.0.1:9092" if you want to run this 
> container on localhost via Docker
>       KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
>       KAFKA_CFG_NODE_ID: 1
>       ALLOW_PLAINTEXT_LISTENER: yes
>       KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded 
> UUID{noformat}
> And running franz-go integration tests with KGO_TEST_RF=1, I consistently 
> receive INVALID_RECORD errors.
>  
> Looking at the container logs, I see these problematic log lines:
> {noformat}
> 2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition 
> 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-0 
> (kafka.server.ReplicaManager) 
> org.apache.kafka.common.InvalidRecordException: Invalid negative header key 
> size -25
> [2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition 
> 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-6 
> (kafka.server.ReplicaManager) 
> org.apache.kafka.common.InvalidRecordException: Reached end of input stream 
> before skipping all bytes. Remaining bytes:94
> [2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition 
> 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-1 
> (kafka.server.ReplicaManager) 
> org.apache.kafka.common.InvalidRecordException: Found invalid number of 
> record headers -26
> [2023-10-19 23:33:47,948] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition 
> 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-6 
> (kafka.server.ReplicaManager) 
> org.apache.kafka.common.InvalidRecordException: Found invalid number of 
> record headers -27
> [2023-10-19 23:33:47,950] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition 
> 0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-22 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: Invalid negative header key 
> size -25
> [2023-10-19 23:33:47,947] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition 
> c63b6e30987317fad18815effb8d432b6df677d2ab56cf6da517bb93fa49b74b-25 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: Found invalid number of 
> record headers -50
> [2023-10-19 23:33:47,959] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition 
> c63b6e30987317fad18815effb8d432b6df677d2ab56cf6da517bb93fa49b74b-25 
> (kafka.server.ReplicaManager) 
>  {noformat}
>  
> I modified franz-go with a diff to print the request that was written to the 
> wire once this error occurs. Attached is a v9 produce request. I deserialized 
> it locally and am not seeing the corrupt data that Kafka is printing. It's 
> possible there is a bug in the client, but again, these tests have never 
> received this error pre-Kafka 3.6. It _looks like_ there is either corruption 
> when processing the incoming data, or there is some problematic race 
> condition in the broker - I'm not sure which.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15631) Do not send new heartbeat request while another one in-flight

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15631:
--
Labels: kip-848 kip-848-client-support kip-848-e2e kip-848-preview  (was: 
kip-848-client-support kip-848-e2e kip-848-preview)

> Do not send new heartbeat request while another one in-flight
> -
>
> Key: KAFKA-15631
> URL: https://issues.apache.org/jira/browse/KAFKA-15631
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Client consumer should not send a new heartbeat request while there is a 
> previous in-flight. If a HB is in-flight, we should wait for a response or 
> timeout before sending a next one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15282:
--
Labels: kip-848 kip-848-client-support  (was: consumer-threading-refactor 
kip-848 kip-848-client-support)

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15279) Implement client support for KIP-848 assignment RPCs

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15279:
--
Labels: kip-848 kip-848-client-support  (was: consumer-threading-refactor 
kip-848 kip-848-client-support)

> Implement client support for KIP-848 assignment RPCs
> 
>
> Key: KAFKA-15279
> URL: https://issues.apache.org/jira/browse/KAFKA-15279
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
>  # 
> [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]
>  # 
> [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]
> Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to 
> implement the ConsumerGroupAssignmentRequestManager to handle the second and 
> third RPCs on the above list.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15278:
--
Labels: kip-848 kip-848-client-support kip-848-e2e kip-848-preview  (was: 
consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e 
kip-848-preview)

> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
>  * Ensure error is handled correctly
>  * Ensure MembershipStateManager is updated on both successful and failures 
> cases, and the state machine is transioned to the correct state.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15651) Investigate auto commit guarantees during Consumer.assign()

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15651:
--
Labels: consumer-threading-refactor kip-848-preview  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate auto commit guarantees during Consumer.assign()
> ---
>
> Key: KAFKA-15651
> URL: https://issues.apache.org/jira/browse/KAFKA-15651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> In the {{assign()}} method implementation, both {{KafkaConsumer}} and 
> {{PrototypeAsyncConsumer}} commit offsets asynchronously. Is this 
> intentional? [~junrao] asks in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406/files/193af8230d0c61853d764cbbe29bca2fc6361af9#r1349023459]:
> {quote}Do we guarantee that the new owner of the unsubscribed partitions 
> could pick up the latest committed offset?
> {quote}
> Let's confirm whether the asynchronous approach is acceptable and correct. If 
> it is, great, let's enhance the documentation to briefly explain why. If it 
> is not, let's correct the behavior if it's within the API semantic 
> expectations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15548) Ensure all resources created by the Consumer are close()-ed properly

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15548:
--
Labels: consumer-threading-refactor kip-848 kip-848-e2e kip-848-preview  
(was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e 
kip-848-preview)

> Ensure all resources created by the Consumer are close()-ed properly
> 
>
> Key: KAFKA-15548
> URL: https://issues.apache.org/jira/browse/KAFKA-15548
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, kip-848-e2e, 
> kip-848-preview
>
> Upon closing of the {{Consumer}} we need to:
>  # Complete pending commits
>  # Revoke assignment (Note that the revocation involves stop fetching, 
> committing offsets if auto-commit enabled and invoking the 
> onPartitionsRevoked callback)
>  # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the 
> group (or -2 if static member)
>  # Close any fetch sessions on the brokers
>  # Poll the NetworkClient to complete pending I/O
> There is a mechanism introduced in PR 
> [14406|https://github.com/apache/kafka/pull/14406] that allows for performing 
> network I/O on shutdown. The new method 
> {{DefaultBackgroundThread.runAtClose()}} will be executed when 
> {{Consumer.close()}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15637:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Investigate FetcherTest's/FetchRequestManager's 
> testFetchCompletedBeforeHandlerAdded
> 
>
> Key: KAFKA-15637
> URL: https://issues.apache.org/jira/browse/KAFKA-15637
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> Thanks for the reply. I still don't quite understand the test. Why do we 
> duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
>  
> {code:java}
> networkClientDelegate.disconnectAsync(readReplica);
> networkClientDelegate.poll(time.timer(0));
> {code}
>  
> MockClient is only woken up through 
> {{{}networkClientDelegate.disconnectAsync{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15652) Add unit/integration tests to verify OffsetOutOfRangeException is thrown for OffsetFetcherUtils.getOffsetResetTimestamp()

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15652:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Add unit/integration tests to verify OffsetOutOfRangeException is thrown for 
> OffsetFetcherUtils.getOffsetResetTimestamp()
> -
>
> Key: KAFKA-15652
> URL: https://issues.apache.org/jira/browse/KAFKA-15652
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> In the {{updateFetchPositions()}} method implementation, both 
> {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions 
> asynchronously. [~junrao] stated the following in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]:
> {quote}There is a subtle difference between transitioning to reset from 
> initializing and transitioning to reset from {{OffsetOutOfRangeException}} 
> during fetch. In the latter, the application thread will call 
> {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default 
> offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the 
> application thread during {{{}poll{}}}, which is what we want.
> However, for the former, if there is no default offset reset policy, we 
> simply ignore that partition through 
> {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, 
> the partition will be forever in the reset state and the application thread 
> won't get the {{{}OffsetOutOfRangeException{}}}.
> {quote}
> I intentionally changed the code so that no exceptions were thrown in 
> {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an 
> empty map. When I ran the unit tests and integration tests, there were no 
> failures, strongly suggesting that there is no coverage of this particular 
> edge case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15557:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in 
> assignFromUserNoId
> ---
>
> Key: KAFKA-15557
> URL: https://issues.apache.org/jira/browse/KAFKA-15557
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods 
> named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to 
> perform duplicate metadata updates:
> {code:java}
> private void assignFromUser(Set partitions) {
> subscriptions.assignFromUser(partitions);
> client.updateMetadata(initialUpdateResponse);
> // A dummy metadata update to ensure valid leader epoch.
> metadata.updateWithCurrentRequestVersion(
> RequestTestUtils.metadataUpdateWithIds(
> "dummy",
> 1, 
> Collections.emptyMap(),
> singletonMap(topicName, 4),
> tp -> validLeaderEpoch, topicIds
> ),
> false,
> 0L
> );
> }
> {code}
> {{client.updateMetadata()}} eventually calls 
> {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is 
> updating the cluster metadata twice with different values.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15558:
--
Labels: consumer-threading-refactor kip-848-preview  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15636:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
> 
>
> Key: KAFKA-15636
> URL: https://issues.apache.org/jira/browse/KAFKA-15636
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> {{expectedBytes}} is calculated as total, instead of avg. Is this correct?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15615:
--
Labels: consumer-threading-refactor kip-848-preview  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15534) Propagate client response time when timeout to the request handler

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15534:
--
Labels: consumer-threading-refactor kip-848 kip-848-e2e kip-848-preview  
(was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e 
kip-848-preview)

> Propagate client response time when timeout to the request handler
> --
>
> Key: KAFKA-15534
> URL: https://issues.apache.org/jira/browse/KAFKA-15534
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848, kip-848-e2e, 
> kip-848-preview
>
> Currently, we don't have a good way to propagate the response time to the 
> handler when timeout is thrown.
> {code:java}
> unsent.handler.onFailure(new TimeoutException(
> "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); 
> {code}
> The current request manager invoke a system call to retrieve the response 
> time, which is not idea because it is already available at network client
> This is an example of the coordinator request manager:
> {code:java}
> unsentRequest.future().whenComplete((clientResponse, throwable) -> {
> long responseTimeMs = time.milliseconds();
> if (clientResponse != null) {
> FindCoordinatorResponse response = (FindCoordinatorResponse) 
> clientResponse.responseBody();
> onResponse(responseTimeMs, response);
> } else {
> onFailedResponse(responseTimeMs, throwable);
> }
> }); {code}
> But in the networkClientDelegate, we should utilize the currentTimeMs in the 
> trySend to avoid calling time.milliseconds():
> {code:java}
> private void trySend(final long currentTimeMs) {
> ...
> unsent.handler.onFailure(new TimeoutException(
> "Failed to send request after " + unsent.timer.timeoutMs() + " ms."));
> continue;
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15617:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions 
> and testInflightFetchOnPendingPartitions overlap
> --
>
> Key: KAFKA-15617
> URL: https://issues.apache.org/jira/browse/KAFKA-15617
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> In FetcherTest, the two tests testFetchingPendingPartitions and 
> testInflightFetchOnPendingPartitions have significant overlap. Perhaps the 
> former subsumes the latter?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15556:
--
Labels: consumer-threading-refactor kip-848-preview  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
> -
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
>
> As part of the review for [FetchRequestManager pull 
> request|https://github.com/apache/kafka/pull/14406], [~junrao] had some 
> questions related to the correctness and clarity of the 
> {{FetcherTest.testCompletedFetchRemoval()}} test:
> Questions:
> * https://github.com/apache/kafka/pull/14406#discussion_r1347908197
> * https://github.com/apache/kafka/pull/14406#discussion_r1347910980
> * https://github.com/apache/kafka/pull/14406#discussion_r1347913781



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15634:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
> 
>
> Key: KAFKA-15634
> URL: https://issues.apache.org/jira/browse/KAFKA-15634
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> What is the point of the code in the initial {{while}} loop since the receive 
> is delayed and thus there is no {{throttleDelayMs}} received in the client?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15635:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848-client-support)

> Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
> -
>
> Key: KAFKA-15635
> URL: https://issues.apache.org/jira/browse/KAFKA-15635
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there 
> is only 1 assigned partition?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15550) OffsetsForTimes validation for negative timestamps in new consumer

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15550:
--
Labels: consumer-threading-refactor kip-848 kip-848-preview  (was: 
consumer-threading-refactor kip-848 kip-848-client-support kip-848-preview)

> OffsetsForTimes validation for negative timestamps in new consumer
> --
>
> Key: KAFKA-15550
> URL: https://issues.apache.org/jira/browse/KAFKA-15550
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, kip-848-preview
>
> OffsetsForTimes api call should fail with _IllegalArgumentException_ if 
> negative timestamps are provided as arguments. This will effectively exclude 
> earliest and latest offsets as target times, keeping the current behaviour of 
> the KafkaConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Component/s: unit tests

> Investigate ConsumerNetworkThreadTest's 
> testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Summary: Investigate ConsumerNetworkThreadTest's 
testResetPositionsProcessFailureIsIgnored  (was: Investigate 
ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored)

> Investigate ConsumerNetworkThreadTest's 
> testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14724:
--
Component/s: unit tests

> Port tests in FetcherTest to FetchRequestManagerTest
> 
>
> Key: KAFKA-14724
> URL: https://issues.apache.org/jira/browse/KAFKA-14724
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task involves copying the relevant tests from {{FetcherTest}} and 
> modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15656) Frequent INVALID_RECORD on Kafka 3.6

2023-10-19 Thread Travis Bischel (Jira)
Travis Bischel created KAFKA-15656:
--

 Summary: Frequent INVALID_RECORD on Kafka 3.6
 Key: KAFKA-15656
 URL: https://issues.apache.org/jira/browse/KAFKA-15656
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 3.6.0
Reporter: Travis Bischel
 Attachments: invalid_record.log

Using this docker-compose.yml:
{noformat}
version: "3.7"
services:
  kafka:
    image: bitnami/kafka:latest
    network_mode: host
    environment:
      KAFKA_ENABLE_KRAFT: yes
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093
      # Set this to "PLAINTEXT://127.0.0.1:9092" if you want to run this 
container on localhost via Docker
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CFG_NODE_ID: 1
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded 
UUID{noformat}
And running franz-go integration tests with KGO_TEST_RF=1, I consistently 
receive INVALID_RECORD errors.

 

Looking at the container logs, I see these problematic log lines:
{noformat}
2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-0 
(kafka.server.ReplicaManager) 
org.apache.kafka.common.InvalidRecordException: Invalid negative header key 
size -25
[2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-6 
(kafka.server.ReplicaManager) 
org.apache.kafka.common.InvalidRecordException: Reached end of input stream 
before skipping all bytes. Remaining bytes:94
[2023-10-19 23:33:47,942] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-1 
(kafka.server.ReplicaManager) 
org.apache.kafka.common.InvalidRecordException: Found invalid number of record 
headers -26
[2023-10-19 23:33:47,948] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-6 
(kafka.server.ReplicaManager) 
org.apache.kafka.common.InvalidRecordException: Found invalid number of record 
headers -27
[2023-10-19 23:33:47,950] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
0cf2f3faaafd3f906ea848b684b04833ca162bcd19ecae2cab36767a54f248c7-22 
(kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: Invalid negative header key 
size -25
[2023-10-19 23:33:47,947] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
c63b6e30987317fad18815effb8d432b6df677d2ab56cf6da517bb93fa49b74b-25 
(kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: Found invalid number of record 
headers -50
[2023-10-19 23:33:47,959] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
c63b6e30987317fad18815effb8d432b6df677d2ab56cf6da517bb93fa49b74b-25 
(kafka.server.ReplicaManager) 
 {noformat}
 

I modified franz-go with a diff to print the request that was written to the 
wire once this error occurs. Attached is a v9 produce request. I deserialized 
it locally and am not seeing the corrupt data that Kafka is printing. It's 
possible there is a bug in the client, but again, these tests have never 
received this error pre-Kafka 3.6. It _looks like_ there is either corruption 
when processing the incoming data, or there is some problematic race condition 
in the broker - I'm not sure which.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15617:
--
Component/s: unit tests
 Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions 
> and testInflightFetchOnPendingPartitions overlap
> --
>
> Key: KAFKA-15617
> URL: https://issues.apache.org/jira/browse/KAFKA-15617
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> In FetcherTest, the two tests testFetchingPendingPartitions and 
> testInflightFetchOnPendingPartitions have significant overlap. Perhaps the 
> former subsumes the latter?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15636:
--
Component/s: unit tests
 Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
> 
>
> Key: KAFKA-15636
> URL: https://issues.apache.org/jira/browse/KAFKA-15636
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> {{expectedBytes}} is calculated as total, instead of avg. Is this correct?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Component/s: unit tests
 Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15637:
--
Component/s: unit tests
 Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate FetcherTest's/FetchRequestManager's 
> testFetchCompletedBeforeHandlerAdded
> 
>
> Key: KAFKA-15637
> URL: https://issues.apache.org/jira/browse/KAFKA-15637
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> Thanks for the reply. I still don't quite understand the test. Why do we 
> duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
>  
> {code:java}
> networkClientDelegate.disconnectAsync(readReplica);
> networkClientDelegate.poll(time.timer(0));
> {code}
>  
> MockClient is only woken up through 
> {{{}networkClientDelegate.disconnectAsync{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15634:
--
Component/s: unit tests
 Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
> 
>
> Key: KAFKA-15634
> URL: https://issues.apache.org/jira/browse/KAFKA-15634
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> What is the point of the code in the initial {{while}} loop since the receive 
> is delayed and thus there is no {{throttleDelayMs}} received in the client?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15557:
--
Component/s: unit tests
 Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in 
> assignFromUserNoId
> ---
>
> Key: KAFKA-15557
> URL: https://issues.apache.org/jira/browse/KAFKA-15557
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods 
> named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to 
> perform duplicate metadata updates:
> {code:java}
> private void assignFromUser(Set partitions) {
> subscriptions.assignFromUser(partitions);
> client.updateMetadata(initialUpdateResponse);
> // A dummy metadata update to ensure valid leader epoch.
> metadata.updateWithCurrentRequestVersion(
> RequestTestUtils.metadataUpdateWithIds(
> "dummy",
> 1, 
> Collections.emptyMap(),
> singletonMap(topicName, 4),
> tp -> validLeaderEpoch, topicIds
> ),
> false,
> 0L
> );
> }
> {code}
> {{client.updateMetadata()}} eventually calls 
> {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is 
> updating the cluster metadata twice with different values.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
> -
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> As part of the review for [FetchRequestManager pull 
> request|https://github.com/apache/kafka/pull/14406], [~junrao] had some 
> questions related to the correctness and clarity of the 
> {{FetcherTest.testCompletedFetchRemoval()}} test:
> Questions:
> * https://github.com/apache/kafka/pull/14406#discussion_r1347908197
> * https://github.com/apache/kafka/pull/14406#discussion_r1347910980
> * https://github.com/apache/kafka/pull/14406#discussion_r1347913781



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15652) Add unit/integration tests to verify OffsetOutOfRangeException is thrown for OffsetFetcherUtils.getOffsetResetTimestamp()

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15652:
--
Component/s: unit tests
 Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Add unit/integration tests to verify OffsetOutOfRangeException is thrown for 
> OffsetFetcherUtils.getOffsetResetTimestamp()
> -
>
> Key: KAFKA-15652
> URL: https://issues.apache.org/jira/browse/KAFKA-15652
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> In the {{updateFetchPositions()}} method implementation, both 
> {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions 
> asynchronously. [~junrao] stated the following in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]:
> {quote}There is a subtle difference between transitioning to reset from 
> initializing and transitioning to reset from {{OffsetOutOfRangeException}} 
> during fetch. In the latter, the application thread will call 
> {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default 
> offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the 
> application thread during {{{}poll{}}}, which is what we want.
> However, for the former, if there is no default offset reset policy, we 
> simply ignore that partition through 
> {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, 
> the partition will be forever in the reset state and the application thread 
> won't get the {{{}OffsetOutOfRangeException{}}}.
> {quote}
> I intentionally changed the code so that no exceptions were thrown in 
> {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an 
> empty map. When I ran the unit tests and integration tests, there were no 
> failures, strongly suggesting that there is no coverage of this particular 
> edge case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15635:
--
Component/s: unit tests
 Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor kip-848-client-support kip-848-preview)

> Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
> -
>
> Key: KAFKA-15635
> URL: https://issues.apache.org/jira/browse/KAFKA-15635
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there 
> is only 1 assigned partition?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Summary: Investigate ConsumerNetworkThreadTest's testPollResultTimer  (was: 
Investigate ConsumerNetworkThread's testPollResultTimer)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Component/s: unit tests

> Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
> -
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> As part of the review for [FetchRequestManager pull 
> request|https://github.com/apache/kafka/pull/14406], [~junrao] had some 
> questions related to the correctness and clarity of the 
> {{FetcherTest.testCompletedFetchRemoval()}} test:
> Questions:
> * https://github.com/apache/kafka/pull/14406#discussion_r1347908197
> * https://github.com/apache/kafka/pull/14406#discussion_r1347910980
> * https://github.com/apache/kafka/pull/14406#discussion_r1347913781



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15277) Design & implement support for internal Consumer delegates

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15277:
--
Labels: consumer-threading-refactor kip-848 kip-848-e2e kip-848-preview  
(was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e 
kip-848-preview)

> Design & implement support for internal Consumer delegates
> --
>
> Key: KAFKA-15277
> URL: https://issues.apache.org/jira/browse/KAFKA-15277
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848, kip-848-e2e, 
> kip-848-preview
>
> As mentioned above, there are presently two different, coexisting 
> implementations of the {{Consumer}} interface: {{KafkaConsumer}} ("old") and 
> {{PrototypeAsyncConsumer}} ("new"). Eventually, these will be reorganized 
> using the delegation pattern. The top-level {{KafkaConsumer}} that implements 
> the old protocol will be renamed as {{LegacyKafkaConsumerDelegate}} and 
> {{PrototypeAsyncConsumer}} will be renamed as 
> {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}}. It is assumed that neither 
> {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}} nor 
> {{{}LegacyKafkaConsume{}}}{{{}rDelegate{}}} will be top-level implementations 
> of {{{}Consumer{}}}, but will likely implement an internal interface that is 
> better suited to the needs of the top-level {{{}KafkaConsumer{}}}.
> Provide the Java client support for the consumer delegates, including:
>  * Create {{ConsumerDelegate}} interface
>  * Clone {{{}KafkaConsumer{}}}, rename as {{LegacyKafkaConsumerDelegate}} and 
> refactor to implement {{ConsumerDelegate}}
>  * Rename {{PrototypeAsyncConsumer}} to {{AsyncKafkaConsumerDelegate}} and 
> refactor to implement the {{ConsumerDelegate}} interface
>  * Refactor the (original) {{KafkaConsumer}} to remove the core 
> implementation, instead delegating to the {{{}ConsumerDelegate{}}}, which 
> will be hard-coded to use {{LegacyKafkaConsumerDelegate}}
>  * Once available (in KAFKA-15284), use the 
> {{ConsumerGroupProtocolVersionResolver}} to determine which delegate to use
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15556:
--
Component/s: clients
 consumer

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15558:
--
Component/s: clients
 consumer

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15615:
--
Component/s: clients
 consumer

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15551:
--
Component/s: clients

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15553) Review committed offset refresh logic

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15553:
--
Component/s: clients

> Review committed offset refresh logic
> -
>
> Key: KAFKA-15553
> URL: https://issues.apache.org/jira/browse/KAFKA-15553
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> From the exsiting comment: If there are any partitions which do not have a 
> valid position and are not awaiting reset, then we need to fetch committed 
> offsets.
>  
> In the async consumer: I wonder if it would make sense to refresh the 
> position on the event loop continuously.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15250) DefaultBackgroundThread is running tight loop

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15250:
--
Component/s: clients

> DefaultBackgroundThread is running tight loop
> -
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15173) Consumer event queues should be bounded

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15173:
--
Component/s: clients

> Consumer event queues should be bounded
> ---
>
> Key: KAFKA-15173
> URL: https://issues.apache.org/jira/browse/KAFKA-15173
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
> facilitate message passing between the application thread and the background 
> thread.  The current implementation is boundless, which can potentially cause 
> OOM and other performance-related issues.
> I think the queues need a finite bound, and we need to decide how to handle 
> the situation when the bound is reached.  In particular, I would like to 
> answer these questions:
>  
>  # What should the upper limit be for both queues: Can this be a 
> configurable, memory-based bound? Or just an arbitrary number of events as 
> the bound.
>  # What should happen when the application event queue is filled up?  It 
> seems like we should introduce a new exception type and notify the user that 
> the consumer is full.
>  # What should happen when the background event queue is filled up? This 
> seems less likely to happen, but I imagine it could happen when the user 
> stops polling the consumer, causing the queue to be filled.
>  # Is it necessary to introduce a public configuration for the queue? I think 
> initially we would select an arbitrary constant number and see the community 
> feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15320) Document event queueing patterns

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15320:
--
Component/s: clients
 consumer

> Document event queueing patterns
> 
>
> Key: KAFKA-15320
> URL: https://issues.apache.org/jira/browse/KAFKA-15320
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
>
> We need to first document the event enqueuing patterns in the 
> PrototypeAsyncConsumer. As part of this task, determine if it’s 
> necessary/beneficial to _conditionally_ add events and/or coalesce any 
> duplicate events in the queue.
> _Don’t forget to include diagrams for clarity!_
> This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
> -
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> As part of the review for [FetchRequestManager pull 
> request|https://github.com/apache/kafka/pull/14406], [~junrao] had some 
> questions related to the correctness and clarity of the 
> {{FetcherTest.testCompletedFetchRemoval()}} test:
> Questions:
> * https://github.com/apache/kafka/pull/14406#discussion_r1347908197
> * https://github.com/apache/kafka/pull/14406#discussion_r1347910980
> * https://github.com/apache/kafka/pull/14406#discussion_r1347913781



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15634:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
> 
>
> Key: KAFKA-15634
> URL: https://issues.apache.org/jira/browse/KAFKA-15634
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> What is the point of the code in the initial {{while}} loop since the receive 
> is delayed and thus there is no {{throttleDelayMs}} received in the client?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15557:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in 
> assignFromUserNoId
> ---
>
> Key: KAFKA-15557
> URL: https://issues.apache.org/jira/browse/KAFKA-15557
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods 
> named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to 
> perform duplicate metadata updates:
> {code:java}
> private void assignFromUser(Set partitions) {
> subscriptions.assignFromUser(partitions);
> client.updateMetadata(initialUpdateResponse);
> // A dummy metadata update to ensure valid leader epoch.
> metadata.updateWithCurrentRequestVersion(
> RequestTestUtils.metadataUpdateWithIds(
> "dummy",
> 1, 
> Collections.emptyMap(),
> singletonMap(topicName, 4),
> tp -> validLeaderEpoch, topicIds
> ),
> false,
> 0L
> );
> }
> {code}
> {{client.updateMetadata()}} eventually calls 
> {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is 
> updating the cluster metadata twice with different values.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15651) Investigate auto commit guarantees during Consumer.assign()

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15651:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Investigate auto commit guarantees during Consumer.assign()
> ---
>
> Key: KAFKA-15651
> URL: https://issues.apache.org/jira/browse/KAFKA-15651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> In the {{assign()}} method implementation, both {{KafkaConsumer}} and 
> {{PrototypeAsyncConsumer}} commit offsets asynchronously. Is this 
> intentional? [~junrao] asks in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406/files/193af8230d0c61853d764cbbe29bca2fc6361af9#r1349023459]:
> {quote}Do we guarantee that the new owner of the unsubscribed partitions 
> could pick up the latest committed offset?
> {quote}
> Let's confirm whether the asynchronous approach is acceptable and correct. If 
> it is, great, let's enhance the documentation to briefly explain why. If it 
> is not, let's correct the behavior if it's within the API semantic 
> expectations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThread's testPollResultTimer

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Investigate ConsumerNetworkThread's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15617:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions 
> and testInflightFetchOnPendingPartitions overlap
> --
>
> Key: KAFKA-15617
> URL: https://issues.apache.org/jira/browse/KAFKA-15617
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> In FetcherTest, the two tests testFetchingPendingPartitions and 
> testInflightFetchOnPendingPartitions have significant overlap. Perhaps the 
> former subsumes the latter?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15615:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15636:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
> 
>
> Key: KAFKA-15636
> URL: https://issues.apache.org/jira/browse/KAFKA-15636
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> {{expectedBytes}} is calculated as total, instead of avg. Is this correct?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15558:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded

2023-10-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15637:
--
Labels: consumer-threading-refactor kip-848-client-support kip-848-preview  
(was: consumer-threading-refactor)

> Investigate FetcherTest's/FetchRequestManager's 
> testFetchCompletedBeforeHandlerAdded
> 
>
> Key: KAFKA-15637
> URL: https://issues.apache.org/jira/browse/KAFKA-15637
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> Thanks for the reply. I still don't quite understand the test. Why do we 
> duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
>  
> {code:java}
> networkClientDelegate.disconnectAsync(readReplica);
> networkClientDelegate.poll(time.timer(0));
> {code}
>  
> MockClient is only woken up through 
> {{{}networkClientDelegate.disconnectAsync{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >