Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-05-01 Thread via GitHub


AndrewJSchofield commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1587067096


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -173,15 +173,23 @@ void awaitNotEmpty(Timer timer) {
 // Update the timer before we head into the loop in case it 
took a while to get the lock.
 timer.update();
 
-if (timer.isExpired())
+if (timer.isExpired()) {
+// If the thread was interrupted before we start waiting, 
it still counts as
+// interrupted from the point of view of the 
KafkaConsumer.poll(Duration) contract.
+// We only need to check this when we are not going to 
wait because waiting
+// already checks whether the thread is interrupted.
+if (Thread.interrupted())
+throw new InterruptException("Thread interrupted.");

Review Comment:
   OK. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-01 Thread via GitHub


showuon commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1587041449


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {

Review Comment:
   I'm not sure if the check `segmentLastEpoch < leaderEpochs.firstKey()` makes 
sense or not.
   
   Suppose: 
   leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
   segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 15)}
   Now, delete_records are called and log start offset incremented to 100, so 
the new leader-epoch-file-cache will be: {(9, 100)}
   
   When entering this check, it'll fail because the segmentLastEpoch (7) will 
be < leaderEpochs.firstKey() (9). But we still want to delete this segment, 
right?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+return false;
+}
+// There can be overlapping remote log segments in the remote storage. 
(eg)
+// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.
+// When the segment1 gets deleted, then the log-start-offset = 51 and 
leader-epoch-file-cache gets updated to: {(7, 51), (9, 100)}.
+// While validating the segment2, we should ensure the overlapping 
remote log segments case.
+Integer segmentFirstEpoch = 
segmentLeaderEpochs.ceilingKey(leaderEpochs.firstKey());
+if (segmentFirstEpoch == null || 
!leaderEpochs.containsKey(segmentFirstEpoch)) {

Review Comment:
   Same here, if the above case makes sense, this check also fails to delete 
the segment.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:

Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-01 Thread via GitHub


mjsax commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1587028687


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,14 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize 
serdes for sink node %s", name()), e);

Review Comment:
   Should we split this up further, and have two try-catch blocks, one for the 
key, and one for the value, to narrow it down further and add key/value as 
information to the error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]

2024-05-01 Thread via GitHub


mjsax commented on PR #13283:
URL: https://github.com/apache/kafka/pull/13283#issuecomment-2089475025

   Very happy to see some activity on this PR. The release plan is in the wiki: 
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0
   
   Feature freeze is May/29, so there is still 4 weeks to the merge deadline. 
Would be great to finally close this one out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] DO NOT MERGE: Isolate Connect tests [kafka]

2024-05-01 Thread via GitHub


github-actions[bot] commented on PR #15229:
URL: https://github.com/apache/kafka/pull/15229#issuecomment-2089469362

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-01 Thread via GitHub


mjsax commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1587018485


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java:
##
@@ -19,20 +19,31 @@
 public enum AssignorError {
 // Note: this error code should be reserved for fatal errors, as the 
receiving clients are future-proofed
 // to throw an exception upon an unrecognized error code.
-NONE(0),
-INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-VERSION_PROBING(2), // not actually used anymore, but we may hit it during 
a rolling upgrade from earlier versions
-ASSIGNMENT_ERROR(3),
-SHUTDOWN_REQUESTED(4);
+NONE(0, "NONE", "NONE"),
+INCOMPLETE_SOURCE_TOPIC_METADATA(1, 
"INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check 
which topics are missing, please look into the logs of the consumer group 
leader. Only the leaders knows and logs the name of the missing topics."),
+VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually 
used anymore, but we may hit it during a rolling upgrade from earlier versions
+ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Hit an unexpected exception 
during task assignment phase of rebalance."),
+SHUTDOWN_REQUESTED(4, "SHUTDOWN_REQUESTED","Encountered fatal error, and 
should send shutdown request for the entire application.");

Review Comment:
   ```suggestion
   SHUTDOWN_REQUESTED(4, "SHUTDOWN_REQUESTED", "A KafkaStreams instance 
encountered a fatal error and requested a shutdown for the entire 
application.");
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java:
##
@@ -19,20 +19,31 @@
 public enum AssignorError {
 // Note: this error code should be reserved for fatal errors, as the 
receiving clients are future-proofed
 // to throw an exception upon an unrecognized error code.
-NONE(0),
-INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-VERSION_PROBING(2), // not actually used anymore, but we may hit it during 
a rolling upgrade from earlier versions
-ASSIGNMENT_ERROR(3),
-SHUTDOWN_REQUESTED(4);
+NONE(0, "NONE", "NONE"),
+INCOMPLETE_SOURCE_TOPIC_METADATA(1, 
"INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check 
which topics are missing, please look into the logs of the consumer group 
leader. Only the leaders knows and logs the name of the missing topics."),
+VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually 
used anymore, but we may hit it during a rolling upgrade from earlier versions
+ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Hit an unexpected exception 
during task assignment phase of rebalance."),

Review Comment:
   ```suggestion
   ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Internal task assignment error. 
Check the group leader logs for details."),
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java:
##
@@ -19,20 +19,31 @@
 public enum AssignorError {
 // Note: this error code should be reserved for fatal errors, as the 
receiving clients are future-proofed
 // to throw an exception upon an unrecognized error code.
-NONE(0),
-INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-VERSION_PROBING(2), // not actually used anymore, but we may hit it during 
a rolling upgrade from earlier versions
-ASSIGNMENT_ERROR(3),
-SHUTDOWN_REQUESTED(4);
+NONE(0, "NONE", "NONE"),
+INCOMPLETE_SOURCE_TOPIC_METADATA(1, 
"INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check 
which topics are missing, please look into the logs of the consumer group 
leader. Only the leaders knows and logs the name of the missing topics."),

Review Comment:
   ```suggestion
   INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA", 
"Missing metadata for source topics. Check the group leader logs for details."),
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java:
##
@@ -19,20 +19,31 @@
 public enum AssignorError {
 // Note: this error code should be reserved for fatal errors, as the 
receiving clients are future-proofed
 // to throw an exception upon an unrecognized error code.
-NONE(0),
-INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-VERSION_PROBING(2), // not actually used anymore, but we may hit it during 
a rolling upgrade from earlier versions
-ASSIGNMENT_ERROR(3),
-SHUTDOWN_REQUESTED(4);
+NONE(0, "NONE", "NONE"),
+INCOMPLETE_SOURCE_TOPIC_METADATA(1, 
"INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check 
which topics are missing, please look into the logs of the consumer group 
leader. Only the leaders knows and logs the name of the missing topics."),
+VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually 
used anymore, 

[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Component/s: clients

> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and {{window.inner.serde.class}} are not a true 
> KafkaStreams config, and are ignored when set from a KStreams application. 
> Both belong on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}} and {{window.inner.serde.class}} are not a true KafkaStreams 
config, and are ignored when set from a KStreams application. Both belong on 
the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and {{window.inner.serde.class}} are not a true 
> KafkaStreams config, and are ignored when set from a KStreams application. 
> Both belong on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in 
> an error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{window.size.ms}}  is not a true KafkaStreams config, and results in an error 
when set from a KStreams application. It belongs on the client.

[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and inner.serde.class in StreamsConfig
> ---
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Summary: Deprecate window.size.ms and window.inner.serde.class in 
StreamsConfig  (was: Deprecate window.size.ms and inner.serde.class in 
StreamsConfig)

> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Summary: Deprecate window.size.ms and inner.serde.class in StreamsConfig  
(was: Deprecate window.size.ms in StreamsConfig)

> Deprecate window.size.ms and inner.serde.class in StreamsConfig
> ---
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}}  is not a true KafkaStreams config, and results in an error 
when set from a KStreams application. It belongs on the client.

[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:{{window.size.ms}}  is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.


> Deprecate window.size.ms in StreamsConfig
> -
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Labels: KIP  (was: needs-kip)

> Deprecate window.size.ms in StreamsConfig
> -
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1587006765


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -55,14 +56,21 @@ public class ClusterConfig {
 private final Map> 
perBrokerOverrideProperties;
 
 @SuppressWarnings("checkstyle:ParameterNumber")
-private ClusterConfig(Type type, int brokers, int controllers, String 
name, boolean autoStart,
+private ClusterConfig(Type type, int brokers, int controllers, int 
disksPerBroker, String name, boolean autoStart,
   SecurityProtocol securityProtocol, String listenerName, File 
trustStoreFile,
   MetadataVersion metadataVersion, Map 
serverProperties, Map producerProperties,
   Map consumerProperties, Map 
adminClientProperties, Map saslServerProperties,
   Map saslClientProperties, Map> perBrokerOverrideProperties) {
+if (brokers < 0) {
+throw new IllegalArgumentException("Number of brokers must be 
greater or equal to zero.");
+}
+if (controllers <= 0 || disksPerBroker <= 0) {

Review Comment:
   `controllers <= 0` is acceptable if it is zk mode. Also, `TestKitNodes` 
guards against that already
   
   
https://github.com/apache/kafka/blob/89d8045a15b622805f65c3c6fbfde82606921f65/core/src/test/java/kafka/testkit/TestKitNodes.java#L90
   
   Hence, `ClusterConfig` does not require such check, and we can add comment 
to explain why we don't need to check `controllers` here.



##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -104,6 +107,26 @@ public void testClusterTests() {
 }
 }
 
+@ClusterTests({
+@ClusterTest(clusterType = Type.ZK),

Review Comment:
   We should test all types for both cases: "default" + "custom"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-05-01 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842828#comment-17842828
 ] 

Phuc Hong Tran commented on KAFKA-15561:


[~kirktrue] just a quick question, why was this moved back to 3.8.0?

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, regex
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1587003306


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);
+timer.update();

Review Comment:
   BTW, please let's me know if you have no free time. I'm fine to fix it if 
above bug I described is existent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15843:
URL: https://github.com/apache/kafka/pull/15843#discussion_r1586996403


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);
+timer.update();

Review Comment:
   (this comment is unrelated to this PR)
   
   It seems `AsyncConsumer#position` does not honour `WakupException`? see 
following test
   ```scala
 @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
 @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
 @Timeout(10)
 def testPositionRespectsWakeup(quorum: String, groupProtocol: String): 
Unit = {
   val topicPartition = new TopicPartition("abc", 15)
   val consumer = createConsumer()
   consumer.assign(List(topicPartition).asJava)
   val service = Executors.newSingleThreadExecutor()
   service.execute(() => {
 TimeUnit.SECONDS.sleep(1)
 consumer.wakeup()
   })
   try assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(3)))
   finally {
 service.shutdownNow()
 service.awaitTermination(1, TimeUnit.SECONDS)
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16655) deflake ZKMigrationIntegrationTest.testDualWrite

2024-05-01 Thread Alyssa Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842820#comment-17842820
 ] 

Alyssa Huang commented on KAFKA-16655:
--

[https://github.com/apache/kafka/pull/15845/files] 

> deflake ZKMigrationIntegrationTest.testDualWrite
> 
>
> Key: KAFKA-16655
> URL: https://issues.apache.org/jira/browse/KAFKA-16655
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Alyssa Huang
>Assignee: Alyssa Huang
>Priority: Minor
>
> {code:java}
> Failed to map supported failure 'org.opentest4j.AssertionFailedError: 
> expected: not equal but was: <0>' with mapper 
> 'org.gradle.api.internal.tasks.testing.failure.mappers.OpenTestAssertionFailedMapper@59b5251d':
>  Cannot invoke "Object.getClass()" because "obj" is null
> > Task :core:test
> kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8] failed, 
> log available in 
> /Users/ahuang/ce-kafka/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8].test.stdout
> Gradle Test Run :core:test > Gradle Test Executor 8 > 
> ZkMigrationIntegrationTest > testDualWrite(ClusterInstance) > testDualWrite 
> [8] Type=ZK, MetadataVersion=3.8-IV0, Security=PLAINTEXT FAILED
> org.opentest4j.AssertionFailedError: expected: not equal but was: <0>
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152)
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at 
> app//org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:277)
> at 
> app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:119)
> at 
> app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:111)
> at 
> app//org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2121)
> at 
> app//kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ZkMigrationIntegrationTest.scala:995)
>  {code}
> This test occasionally fails due to stale broker epoch exceptions, which in 
> turn causes allocate producer ids to fail.
> Also fixes {{sendAllocateProducerIds}} erroneously returning 0 as the 
> `producerIdStart` in error cases (because `onComplete` only accounts for 
> timeouts and ignores any other error code)
> {code:java}
> [2024-04-12 18:45:08,820] INFO [ControllerServer id=3000] 
> allocateProducerIds: event failed with StaleBrokerEpochException in 19 
> microseconds. (org.apache.kafka.controller.QuorumController:765) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16655 Deflaking ZKMigrationIntegrationTest.testDualWrite [kafka]

2024-05-01 Thread via GitHub


ahuang98 opened a new pull request, #15845:
URL: https://github.com/apache/kafka/pull/15845

   This test occasionally fails due to stale broker epoch exceptions, which in 
turn causes allocate producer ids to fail.
   
   Adds retries as stale broker epoch is a retriable issue, and fixes 
sendAllocateProducerIds returning 0 as the producerIdStart in error cases
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16655) deflake ZKMigrationIntegrationTest.testDualWrite

2024-05-01 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-16655:


 Summary: deflake ZKMigrationIntegrationTest.testDualWrite
 Key: KAFKA-16655
 URL: https://issues.apache.org/jira/browse/KAFKA-16655
 Project: Kafka
  Issue Type: Improvement
Reporter: Alyssa Huang
Assignee: Alyssa Huang


{code:java}
Failed to map supported failure 'org.opentest4j.AssertionFailedError: expected: 
not equal but was: <0>' with mapper 
'org.gradle.api.internal.tasks.testing.failure.mappers.OpenTestAssertionFailedMapper@59b5251d':
 Cannot invoke "Object.getClass()" because "obj" is null

> Task :core:test
kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8] failed, 
log available in 
/Users/ahuang/ce-kafka/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8].test.stdout

Gradle Test Run :core:test > Gradle Test Executor 8 > 
ZkMigrationIntegrationTest > testDualWrite(ClusterInstance) > testDualWrite [8] 
Type=ZK, MetadataVersion=3.8-IV0, Security=PLAINTEXT FAILED
org.opentest4j.AssertionFailedError: expected: not equal but was: <0>
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at 
app//org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:277)
at 
app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:119)
at 
app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:111)
at 
app//org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2121)
at 
app//kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ZkMigrationIntegrationTest.scala:995)
 {code}

This test occasionally fails due to stale broker epoch exceptions, which in 
turn causes allocate producer ids to fail.

Also fixes {{sendAllocateProducerIds}} erroneously returning 0 as the 
`producerIdStart` in error cases (because `onComplete` only accounts for 
timeouts and ignores any other error code)


{code:java}
[2024-04-12 18:45:08,820] INFO [ControllerServer id=3000] allocateProducerIds: 
event failed with StaleBrokerEpochException in 19 microseconds. 
(org.apache.kafka.controller.QuorumController:765) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1586968300


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -173,15 +173,23 @@ void awaitNotEmpty(Timer timer) {
 // Update the timer before we head into the loop in case it 
took a while to get the lock.
 timer.update();
 
-if (timer.isExpired())
+if (timer.isExpired()) {
+// If the thread was interrupted before we start waiting, 
it still counts as
+// interrupted from the point of view of the 
KafkaConsumer.poll(Duration) contract.
+// We only need to check this when we are not going to 
wait because waiting
+// already checks whether the thread is interrupted.
+if (Thread.interrupted())
+throw new InterruptException("Thread interrupted.");

Review Comment:
   Maybe we should use the same exception message `Interrupted waiting for 
results from fetching records` for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16637: KIP-848 does not work well [kafka]

2024-05-01 Thread via GitHub


kirktrue opened a new pull request, #15844:
URL: https://github.com/apache/kafka/pull/15844

   This issue is related to an optimization for offset fetch logic.
   
   When a user calls `Consumer.poll()`, among other things, the consumer 
performs a network request to fetch any previously-committed offsets so it can 
determine from where to start fetching new records. When the user passes in a 
timeout of zero, it's almost always the case that the offset fetch network 
request will not be performed within 0 milliseconds. However, the consumer 
still sends out the request and handles the response when it is received, 
usually a few milliseconds later. In this first attempt, the lookup fails and 
the `poll()` loops back around. Given that this timeout is the common case, the 
consumer caches the offset fetch response/result from the first attempt (even 
though it timed out) because it knows that the next call to `poll()` is going 
to attempt the exact same operation. When it is later attempted a second time, 
the response is already there from the first attempt such that the consumer 
doesn't need to perform a network request.
   
   The existing consumer has implemented this caching in 
[PendingCommittedOffsetRequest](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L132).
 The new consumer has implemented it in 
[CommitRequestManager](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L510).
 The core issue is the new consumer implementation is clearing out the first 
attempt's cached result too aggressively. The effect being that the second (and 
subsequent) attempts fail to find any previous attempt's cached result, and all 
submit network requests, which all fail. Thus the consumer never makes any 
headway.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-05-01 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16637:
--
Priority: Blocker  (was: Minor)

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16427) KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER

2024-05-01 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842817#comment-17842817
 ] 

Kirk True commented on KAFKA-16427:
---

This bug can be triggered with an embarrassingly simple integration test (n)

But it appears this bug can be fixed with an embarrassingly simple change, so 
(y)

> KafkaConsumer#position() does not respect timeout when group protocol is 
> CONSUMER
> -
>
> Key: KAFKA-16427
> URL: https://issues.apache.org/jira/browse/KAFKA-16427
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Alyssa Huang
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> When
> `long position(TopicPartition partition, final Duration timeout);`
> is called on an unknown topic partition (and auto creation is disabled), the 
> method fails to adhere to the timeout supplied.
> e.g. the following warning is logged continuously as metadata fetches are 
> retried 
> [2024-03-26 11:03:48,589] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Error while fetching metadata with correlation id 200 : 
> \{nonexistingTopic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient:1313)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15823:
URL: https://github.com/apache/kafka/pull/15823#discussion_r1586962262


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -399,7 +398,7 @@ public TopicCreationResponse createOrFindTopics(NewTopic... 
topics) {
 }
 }
 if (topicsByName.isEmpty()) return EMPTY_CREATION;
-String topicNameList = Utils.join(topicsByName.keySet(), "', '");
+String topicNameList = String.join("', '", 
topicsByName.keySet()).replace("[", "").replace("]", "");

Review Comment:
   Why we need those `replace`?



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -630,7 +629,7 @@ public Map describeTopicConfigs(String... 
topicNames) {
 if (topics.isEmpty()) {
 return Collections.emptyMap();
 }
-String topicNameList = String.join(", ", topics);
+String topicNameList = String.join(", ", topics).replace("[", 
"").replace("]", "");

Review Comment:
   ditto



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -475,7 +474,7 @@ public Map 
describeTopics(String... topics) {
 if (topics == null) {
 return Collections.emptyMap();
 }
-String topicNameList = String.join(", ", topics);
+String topicNameList = String.join(", ", topics).replace("[", 
"").replace("]", "");

Review Comment:
   ditto



##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##
@@ -216,7 +217,7 @@ public List> 
consume(TopicPartition topicPartitio
 Function0 messageSupplier = () ->
 String.format("Could not consume %d records of %s from offset 
%d in %d ms. %d message(s) consumed:%s%s",
 expectedTotalCount, topicPartition, fetchOffset, 
timeoutMs, records.size(), sep,
-Utils.join(records, sep));
+String.join(sep, Arrays.toString(records.toArray(;

Review Comment:
   `records.stream().map(Object::toString).collect(Collectors.joining(","))`



##
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java:
##
@@ -230,8 +231,8 @@ else if (leader.lastCaughtUpTimestamp().isPresent() && 
maxLagFollower.lastCaught
 "\nHighWatermark:  " + quorumInfo.highWatermark() +
 "\nMaxFollowerLag: " + maxFollowerLag +
 "\nMaxFollowerLagTimeMs:   " + maxFollowerLagTimeMs +
-"\nCurrentVoters:  " + 
Utils.mkString(quorumInfo.voters().stream().map(v -> v.replicaId()), "[", "]", 
",") +
-"\nCurrentObservers:   " + 
Utils.mkString(quorumInfo.observers().stream().map(v -> v.replicaId()), "[", 
"]", ",")
+"\nCurrentVoters:  " + "[" + 
quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(","))
 + "]" +

Review Comment:
   
`quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(",",
 "[", "]"))`



##
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java:
##
@@ -230,8 +231,8 @@ else if (leader.lastCaughtUpTimestamp().isPresent() && 
maxLagFollower.lastCaught
 "\nHighWatermark:  " + quorumInfo.highWatermark() +
 "\nMaxFollowerLag: " + maxFollowerLag +
 "\nMaxFollowerLagTimeMs:   " + maxFollowerLagTimeMs +
-"\nCurrentVoters:  " + 
Utils.mkString(quorumInfo.voters().stream().map(v -> v.replicaId()), "[", "]", 
",") +
-"\nCurrentObservers:   " + 
Utils.mkString(quorumInfo.observers().stream().map(v -> v.replicaId()), "[", 
"]", ",")
+"\nCurrentVoters:  " + "[" + 
quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(","))
 + "]" +
+"\nCurrentObservers:   " + "[" + 
quorumInfo.observers().stream().map(QuorumInfo.ReplicaState::replicaId).map(Objects::toString).collect(Collectors.joining(","))
 + "]"

Review Comment:
   ditto



##
trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java:
##
@@ -318,7 +320,7 @@ void log() {
 }
 }
 log.info("{}: consumer waiting for {} message(s), starting with: 
{}",
-id, numToReceive, Utils.join(list, ", "));
+id, numToReceive, String.join(", ", 
Arrays.toString(list.toArray(;

Review Comment:
   ditto



##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -488,12 +488,12 @@ private static void clearAllThrottles(Admin adminClient,
 targetParts.forEach(t -> brokers.addAll(t.getValue()));
 
 System.out.printf("Clearing broker-level throttles 

[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-05-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842816#comment-17842816
 ] 

Haruki Okada commented on KAFKA-16541:
--

[~junrao] Yes.
My concern now is only changing renameDir may not be enough, so I'm trying to 
figure out if we can fix in another way without checking all call paths

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]

2024-05-01 Thread via GitHub


kirktrue opened a new pull request, #15843:
URL: https://github.com/apache/kafka/pull/15843

   The AsyncKafkaConsumer implementation of `position(TopicPartition, 
Duration)` was not updating its internal `Timer`, causing it to execute the 
loop forever. Adding a call to update the `Timer` at the bottom of the loop 
fixes the issue.
   
   An integration test was added to catch this case; it fails without the newly 
added call to `Timer.update(long)`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16647) Remove setMetadataDirectory from BrokerNode/ControllerNode

2024-05-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16647.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Remove setMetadataDirectory from BrokerNode/ControllerNode
> --
>
> Key: KAFKA-16647
> URL: https://issues.apache.org/jira/browse/KAFKA-16647
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
> Fix For: 3.8.0
>
>
> `TestKitNodes` does not enable callers to define the location of "base 
> folder". That makes sense to me since callers should not care for it. That 
> means the location of metadata folder shoud be transparent to callers. Hence, 
> the setter of metadata folder is useless.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16647: Remove setMetadataDirectory from BrokerNode/ControllerNode [kafka]

2024-05-01 Thread via GitHub


chia7712 merged PR #15833:
URL: https://github.com/apache/kafka/pull/15833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16647: Remove setMetadataDirectory from BrokerNode/ControllerNode [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on PR #15833:
URL: https://github.com/apache/kafka/pull/15833#issuecomment-2089361742

   loop the failed tests on my local, and they pass.
   ```
   ./gradlew cleanTest :connect:runtime:test --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets
 --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsOverriddenConsumerGroupId
 --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted
 :metadata:test --tests QuorumControllerTest.testBrokerHeartbeatDuringMigration 
--tests QuorumControllerTest.testFenceMultipleBrokers --tests 
QuorumControllerTest.testConfigurationOperations :connect:mirror:test --tests 
IdentityReplicationIntegrationTest.testReplicateFromLatest --tests 
MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow
 --tests MirrorConnectorsIntegrationExactlyOnceTest.testSyncTopicConfigs 
--tests MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault 
--tests MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs :c
 ore:test --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails
 --tests ZkMigrationIntegrationTest.testDualWrite
   ```
   Also, the changes of this PR should be unrelated to those failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on code in PR #15841:
URL: https://github.com/apache/kafka/pull/15841#discussion_r1586954293


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState() 
throws Exception {
 verify(configLog).stop();
 }
 
+@Test
+public void testPutConnectorConfigProducerError() throws Exception {
+expectStart(Collections.emptyList(), Collections.emptyMap());
+expectPartitionCount(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0)))
+.thenReturn(CONFIGS_SERIALIZED.get(0));
+when(configLog.sendWithReceipt(anyString(), 
any(byte[].class))).thenReturn(producerFuture);
+
+// Verify initial state
+ClusterConfigState configState = configStorage.snapshot();
+assertEquals(-1, configState.offset());
+assertEquals(0, configState.connectors().size());
+
+when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(
+new ExecutionException(new 
TopicAuthorizationException(Collections.singleton("test";
+
+// verify that the producer exception from KafkaBasedLog::send is 
propagated
+ConnectException e = assertThrows(ConnectException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+SAMPLE_CONFIGS.get(0), null));
+assertTrue(e.getMessage().contains("Error writing connector 
configuration to Kafka"));

Review Comment:
   Could we verify the `e.getCause()` to make sure the error is caused by what 
we expect?



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState() 
throws Exception {
 verify(configLog).stop();
 }
 
+@Test
+public void testPutConnectorConfigProducerError() throws Exception {
+expectStart(Collections.emptyList(), Collections.emptyMap());
+expectPartitionCount(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0)))
+.thenReturn(CONFIGS_SERIALIZED.get(0));
+when(configLog.sendWithReceipt(anyString(), 
any(byte[].class))).thenReturn(producerFuture);
+
+// Verify initial state
+ClusterConfigState configState = configStorage.snapshot();
+assertEquals(-1, configState.offset());
+assertEquals(0, configState.connectors().size());
+
+when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(
+new ExecutionException(new 
TopicAuthorizationException(Collections.singleton("test";
+
+// verify that the producer exception from KafkaBasedLog::send is 
propagated
+ConnectException e = assertThrows(ConnectException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+SAMPLE_CONFIGS.get(0), null));
+assertTrue(e.getMessage().contains("Error writing connector 
configuration to Kafka"));
+
+configStorage.stop();
+verify(configLog).stop();
+}
+
+@Test
+public void testRemoveConnectorConfigSlowProducer() throws Exception {
+expectStart(Collections.emptyList(), Collections.emptyMap());
+expectPartitionCount(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+@SuppressWarnings("unchecked")
+Future connectorConfigProducerFuture = 
mock(Future.class);
+
+@SuppressWarnings("unchecked")
+Future targetStateProducerFuture = mock(Future.class);
+
+when(configLog.sendWithReceipt(anyString(), isNull()))
+// tombstone for the connector config
+.thenReturn(connectorConfigProducerFuture)
+// tombstone for the connector target state
+.thenReturn(targetStateProducerFuture);
+
+
when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS), 
any(TimeUnit.class)))
+.thenAnswer((Answer) invocation -> {
+time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+return null;
+});
+
+// the future get timeout is expected to be reduced according to how 
long the previous Future::get took
+when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class)))
+.thenAnswer((Answer) invocation -> {
+time.sleep(1000);
+

[jira] [Assigned] (KAFKA-16654) Refactor kafka.test.annotation.Type and ClusterTestExtensions

2024-05-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16654:
--

Assignee: TaiJuWu  (was: Chia-Ping Tsai)

> Refactor kafka.test.annotation.Type and ClusterTestExtensions
> -
>
> Key: KAFKA-16654
> URL: https://issues.apache.org/jira/browse/KAFKA-16654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
>
> It seems to me the refactor could include following tasks.
> 1. change `invocationContexts`, method invoked by `ClusterTemplate`, and 
> generate-related methods in `ClusterTestExtensions` to return a 
> java.util.Collection instead of accepting a `java.util.function.Consumer`. 
> That can brings two benefit. 1) more simple in production: we don't need to 
> create a List and then pass it to be a function to collect stuff. 2)  more 
> easy to write unit test.
> 2. separate `provideTestTemplateInvocationContexts` to multi methods to 
> handle each annotation. That can help us to write tests, and make core more 
> readable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-05-01 Thread via GitHub


mjsax commented on code in PR #14360:
URL: https://github.com/apache/kafka/pull/14360#discussion_r1586935977


##
docs/streams/developer-guide/config-streams.html:
##
@@ -240,24 +240,29 @@ num.standby.replicas
-  acceptable.recovery.lag
+  acceptable.recovery.lag
 Medium
 The maximum acceptable lag (number of offsets to 
catch up) for an instance to be considered caught-up and ready for the active 
task.
 1
   
-  application.server
+  application.server
 Low
 A host:port pair pointing to an embedded user 
defined endpoint that can be used for discovering the locations of
   state stores within a single Kafka Streams application. The 
value of this must be different for each instance
   of the application.
 the empty string
   
-  buffered.records.per.partition
+  buffered.records.per.partition
 Low
 The maximum number of records to buffer per 
partition.
 1000
   
-  cache.max.bytes.buffering
+  statestore.cache.max.bytes
+Medium
+Maximum number of memory bytes to be used for 
record caches across all threads.
+10485760
+  
+  cache.max.bytes.buffering (Deprecated. Use 
cache.max.bytes instead.)

Review Comment:
   ```suggestion
 cache.max.bytes.buffering (Deprecated. 
Use statestore.cache.max.bytes instead.)
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -564,25 +565,22 @@ public class StreamsConfig extends AbstractConfig {
 static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store 
implementations to plug in to DSL operators. Must implement the 
org.apache.kafka.streams.state.DslStoreSuppliers interface.";
 static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = 
BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
 
-/** {@code default.windowed.key.serde.inner} */
+/** {@code default.windowed.key.serde.inner
+ * @deprecated since 3.0.0} */
 @SuppressWarnings("WeakerAccess")
 @Deprecated
 public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = 
"default.windowed.key.serde.inner";
 private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = 
"Default serializer / deserializer for the inner class of a windowed key. Must 
implement the " +
 "org.apache.kafka.common.serialization.Serde interface.";
 
-/** {@code default.windowed.value.serde.inner} */
+/** {@code default.windowed.value.serde.inner
+ * @deprecated since 3.0.0 } */
 @SuppressWarnings("WeakerAccess")
 @Deprecated
 public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = 
"default.windowed.value.serde.inner";
 private static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS_DOC = 
"Default serializer / deserializer for the inner class of a windowed value. 
Must implement the " +
 "org.apache.kafka.common.serialization.Serde interface.";
 
-public static final String WINDOWED_INNER_CLASS_SERDE = 
"windowed.inner.class.serde";

Review Comment:
   Seems we cannot just remove it, but also marked as deprecated only instead? 
Also, given that deprecation is part of KIP-1020, should this be only done in 
the KIP-1020 PR, and this PR would only do the docs cleanup?



##
docs/streams/developer-guide/config-streams.html:
##
@@ -326,8 +331,15 @@ num.standby.replicasDefault serializer/deserializer for the inner 
class of windowed keys, implementing the Serde interface.

Review Comment:
   Seems this line does not belong to `dsl.store.suppliers.class` config?



##
docs/streams/developer-guide/config-streams.html:
##
@@ -300,7 +305,7 @@ num.standby.replicasnull
   
-  default.windowed.key.serde.inner
+  default.windowed.key.serde.inner 
(Deprecated.)

Review Comment:
   Below is `default.window.value.serde.inner` which was also deprecated, 
right? (L308 original, new L313)



##
docs/streams/developer-guide/config-streams.html:
##
@@ -326,8 +331,15 @@ num.standby.replicasDefault serializer/deserializer for the inner 
class of windowed keys, implementing the Serde interface.
+null
   
-  max.task.idle.ms
+  default.windowed.value.serde.inner 
(Deprecated.)

Review Comment:
   It seems `default.windowed.value.serde.inner` exist already? Why do we add 
it here?



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -647,7 +645,8 @@ public class StreamsConfig extends AbstractConfig {
 @SuppressWarnings("WeakerAccess")
 public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = 
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
-/** {@code auto.include.jmx.reporter} */
+/** {@code 

[jira] [Commented] (KAFKA-16654) Refactor kafka.test.annotation.Type and ClusterTestExtensions

2024-05-01 Thread TaiJuWu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842808#comment-17842808
 ] 

TaiJuWu commented on KAFKA-16654:
-

Hi  [~chia7712] ,

If you are not working on it, could you assign it to me? 

> Refactor kafka.test.annotation.Type and ClusterTestExtensions
> -
>
> Key: KAFKA-16654
> URL: https://issues.apache.org/jira/browse/KAFKA-16654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> It seems to me the refactor could include following tasks.
> 1. change `invocationContexts`, method invoked by `ClusterTemplate`, and 
> generate-related methods in `ClusterTestExtensions` to return a 
> java.util.Collection instead of accepting a `java.util.function.Consumer`. 
> That can brings two benefit. 1) more simple in production: we don't need to 
> create a List and then pass it to be a function to collect stuff. 2)  more 
> easy to write unit test.
> 2. separate `provideTestTemplateInvocationContexts` to multi methods to 
> handle each annotation. That can help us to write tests, and make core more 
> readable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-05-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842805#comment-17842805
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Thanks for the background! Makes sense.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16614) Disallow `@ClusterTemplate("")`

2024-05-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16614.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Disallow `@ClusterTemplate("")`
> ---
>
> Key: KAFKA-16614
> URL: https://issues.apache.org/jira/browse/KAFKA-16614
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
> Fix For: 3.8.0
>
>
> `@ClusterTemplate` enable us to create dynamic configs, and it expect to 
> accept a method name which can create server configs at runtime. It throws 
> error when we pass a nonexistent method name, but it works if we pass an 
> empty name



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]

2024-05-01 Thread via GitHub


chia7712 merged PR #15800:
URL: https://github.com/apache/kafka/pull/15800


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16654) Refactor kafka.test.annotation.Type and ClusterTestExtensions

2024-05-01 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16654:
--

 Summary: Refactor kafka.test.annotation.Type and 
ClusterTestExtensions
 Key: KAFKA-16654
 URL: https://issues.apache.org/jira/browse/KAFKA-16654
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


It seems to me the refactor could include following tasks.

1. change `invocationContexts`, method invoked by `ClusterTemplate`, and 
generate-related methods in `ClusterTestExtensions` to return a 
java.util.Collection instead of accepting a `java.util.function.Consumer`. That 
can brings two benefit. 1) more simple in production: we don't need to create a 
List and then pass it to be a function to collect stuff. 2)  more easy to write 
unit test.

2. separate `provideTestTemplateInvocationContexts` to multi methods to handle 
each annotation. That can help us to write tests, and make core more readable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586871366


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2415,17 +2476,30 @@ public void resign(int epoch) {
 @Override
 public Optional> createSnapshot(
 OffsetAndEpoch snapshotId,
-long lastContainedLogTime
+long lastContainedLogTimestamp
 ) {
-return RecordsSnapshotWriter.createWithHeader(
-() -> log.createNewSnapshot(snapshotId),
-MAX_BATCH_SIZE_BYTES,
-memoryPool,
-time,
-lastContainedLogTime,
-CompressionType.NONE,
-serde
-);
+if (!isInitialized()) {
+throw new IllegalStateException("Cannot create snapshot before the 
replica has been initialized");
+}
+
+return log.createNewSnapshot(snapshotId).map(writer -> {
+long lastContainedLogOffset = snapshotId.offset() - 1;

Review Comment:
   Yes. I have this issue 
[KAFKA-14620](https://issues.apache.org/jira/browse/KAFKA-14620) to introduce 
the `SnapshotId` type. I can fix this on that PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14620) Add a type for SnapshotId

2024-05-01 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14620:
---
Description: 
We have seen issues where the state machine assumes that offset in the snapshot 
id is inclusive. I think adding at type that makes this clear would help 
developers and reviewers catch such issues.

The snapshot id type should support the help function lastContainedLogOffset.

  was:We have seen issues where the state machine assumes that offset in the 
snapshot id is inclusive. I think adding at type that makes this clear would 
help developers and reviewers catch such issues.


> Add a type for SnapshotId
> -
>
> Key: KAFKA-14620
> URL: https://issues.apache.org/jira/browse/KAFKA-14620
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
>
> We have seen issues where the state machine assumes that offset in the 
> snapshot id is inclusive. I think adding at type that makes this clear would 
> help developers and reviewers catch such issues.
> The snapshot id type should support the help function lastContainedLogOffset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586869606


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -370,8 +363,52 @@ private void maybeFireLeaderChange() {
 }
 }
 
-@Override
-public void initialize() {
+public void initialize(
+Map voterAddresses,
+String listenerName,
+QuorumStateStore quorumStateStore,
+Metrics metrics
+) {
+partitionState = new KRaftControlRecordStateMachine(
+Optional.of(VoterSet.fromAddressSpecs(listenerName, 
voterAddresses)),

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586867970


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -370,8 +363,52 @@ private void maybeFireLeaderChange() {
 }
 }
 
-@Override
-public void initialize() {
+public void initialize(
+Map voterAddresses,
+String listenerName,
+QuorumStateStore quorumStateStore,
+Metrics metrics

Review Comment:
   Yeah. I am also not happy with this move. We only do this delay 
initialization because of integration tests (`QuorumTestHarness`, 
`KRaftClusterTestKit`). This is not needed by `**/src/main`.
   
   Once we have KIP-853 fully implemented, I should be able to fix the 
integration tests to not use the static voter set and the delayed 
initialization.
   
   I create [Remove delayed initialization because of static voter 
set](https://issues.apache.org/jira/browse/KAFKA-16653) to track this work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16653) Remove delayed initialization because of static voter set

2024-05-01 Thread Jira
José Armando García Sancio created KAFKA-16653:
--

 Summary: Remove delayed initialization because of static voter set
 Key: KAFKA-16653
 URL: https://issues.apache.org/jira/browse/KAFKA-16653
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: José Armando García Sancio


Once KRaft supports the AddVoter RPC, the QuorumTestHarness and 
KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the 
static voter set.

This should allow us to remove KRaft's support for delay static voter set 
initialization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586861717


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -159,67 +165,76 @@ public class KafkaRaftClient implements RaftClient {
 private final MemoryPool memoryPool;
 private final RaftMessageQueue messageQueue;
 private final QuorumConfig quorumConfig;
-private final KafkaRaftMetrics kafkaRaftMetrics;
-private final QuorumState quorum;
-private final RequestManager requestManager;
 private final RaftMetadataLogCleanerManager snapshotCleaner;
 
 private final Map, ListenerContext> listenerContexts = new 
IdentityHashMap<>();
 private final ConcurrentLinkedQueue> pendingRegistrations 
= new ConcurrentLinkedQueue<>();
 
+// These components need to be initialized by the method initialize() 
because they depend on the voter set
+/*
+ * The key invariant for the kraft control record state machine is that it 
has always read to the LEO. This is achived by:

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16624: Don't generate useless PartitionChangeRecord on older MV [kafka]

2024-05-01 Thread via GitHub


cmccabe commented on code in PR #15810:
URL: https://github.com/apache/kafka/pull/15810#discussion_r1586861400


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -365,45 +369,62 @@ private void tryElection(PartitionChangeRecord record) {
 }
 
 /**
- * Trigger a leader epoch bump if one is needed.
- *
- * We need to bump the leader epoch if:
- * 1. The leader changed, or
- * 2. The new replica list does not contain all the nodes that the old 
replica list did.
- *
- * Changes that do NOT fall in any of these categories will increase the 
partition epoch, but
- * not the leader epoch. Note that if the leader epoch increases, the 
partition epoch will
- * always increase as well; there is no case where the partition epoch 
increases more slowly
- * than the leader epoch.
+ * Trigger a leader epoch bump if one is needed because of replica 
reassignment.
  *
- * If the PartitionChangeRecord sets the leader field to something other 
than
- * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That 
takes care of
- * case 1. In this function, we check for cases 2 and 3, and handle them 
by manually
- * setting record.leader to the current leader.
- *
- * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica 
manager
- * that required that the leader epoch be bump whenever the ISR shrank. In 
MV 3.6 this leader
- * bump is not required when the ISR shrinks. Note, that the leader epoch 
is never increased if
- * the ISR expanded.
+ * Note that if the leader epoch increases, the partition epoch will 
always increase as well; there is no
+ * case where the partition epoch increases more slowly than the leader 
epoch.
+ */
+void 
triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord 
record) {

Review Comment:
   I agree that the name should be revisited. Better to do that in a follow-on 
PR, though, to avoid making this one too big.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16624: Don't generate useless PartitionChangeRecord on older MV [kafka]

2024-05-01 Thread via GitHub


cmccabe commented on PR #15810:
URL: https://github.com/apache/kafka/pull/15810#issuecomment-2089185449

   > Should we have a new test for that specific case?
   
   Good point. Added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586860365


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -213,20 +213,21 @@ private void completeCurrentBatch() {
  *
  * @param valueCreator a function that uses the passed buffer to create 
the control
  *batch that will be appended. The memory records returned must 
contain one
- *control batch and that control batch have one record.
+ *control batch and that control batch have at least one record.
  */
-private void appendControlMessage(Function 
valueCreator) {
+public void appendControlMessages(Function 
valueCreator) {

Review Comment:
   Okay, I was trying to avoid decoding the `MemoryRecords`. if we are going to 
read the first batch, we don't even need `CreatedRecords`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-01 Thread via GitHub


junrao commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2089169862

   @clolov: Are you able to address the remaining comments? 3.8.0 code freeze 
is getting close. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]

2024-05-01 Thread via GitHub


cmccabe commented on PR #15838:
URL: https://github.com/apache/kafka/pull/15838#issuecomment-2089167707

   > Thanks for the patch, @cmccabe. I looked for other usages of this lock and 
see we're obtaining the write lock in DynamicBrokerConfig#updateBrokerConfig 
which gets called from DynamicConfigPublisher. Will we still need this locking 
when we have dropped ZK?
   
   I think we can simplify this code a lot once ZK is gone, yes. Having a 
single updater will help a lot. I suspect we can get rid of the lock at that 
point, although I'll have to look more later.
   
   > Can you add a comment next to the lock indicating what it is guarding?
   
   I added a clarification that the lock protects against concurrent 
reconfiguration operations, but not concurrent additions or removals of 
reconfigurables. As you mentioned, this is mostly only relevant to ZK mode 
since there we can have racing threads.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]

2024-05-01 Thread via GitHub


cmccabe commented on code in PR #15838:
URL: https://github.com/apache/kafka/pull/15838#discussion_r1586844632


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1541,6 +1541,36 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testReduceNumNetworkThreads(): Unit = {

Review Comment:
   I wanted to ensure that no deadlocks are created in the future, by adding 
this test coverage. I don't think this PR is required to avoid deadlock 
currently, however.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]

2024-05-01 Thread via GitHub


cmccabe commented on code in PR #15838:
URL: https://github.com/apache/kafka/pull/15838#discussion_r1586843624


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -303,17 +303,17 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 addBrokerReconfigurable(controller.socketServer)
   }
 
-  def addReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
+  def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
 verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)

Review Comment:
   Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-05-01 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842783#comment-17842783
 ] 

Jun Rao commented on KAFKA-16541:
-

[~ocadaruma] : Will you be able to work on this soon? The 3.8.0 code freeze is 
getting close. Thanks.

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-05-01 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1586840549


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > I'm not clear on this:
   > 
   > 1. Segments that are eligible for upload to remote storage only when the 
lastStableOffset moves beyond the segment-to-be-uploaded-end-offset.
   > 2. When all the replicas loses local data (offline partition), then we 
consider the data in remote storage also lost. Currently, for this case, we 
don't have provision to serve the remote data.
   > 3. When firstUnstableOffsetMetadata is empty, we return highWatermark. 
With this patch, the highWatermark lower boundary is set to localLogStartOffset 
so there won't be an issue.
   > 
   
   That's true. It's just that that is yet another offset that we need to 
bound. I am also not sure if there are other side effects of adjusting HWM and 
LSO.
   
   Left some comments on https://github.com/apache/kafka/pull/15825.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-01 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1586835182


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -28,6 +28,7 @@ public final class LogOffsetMetadata {
 
 //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage 
module
 private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+public static final long REMOTE_LOG_UNKNOWN_OFFSET = -2L;

Review Comment:
   We probably don't need this. The existing UNIFIED_LOG_UNKNOWN_OFFSET should 
be enough.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -51,6 +57,8 @@ public LogOffsetMetadata(long messageOffset,
 
 // check if this offset is already on an older segment compared with the 
given offset
 public boolean onOlderSegment(LogOffsetMetadata that) {
+if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || 
that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)

Review Comment:
   We probably don't need this. If messageOffsetOnly() is true, we can just 
return false.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -65,6 +73,8 @@ private boolean onSameSegment(LogOffsetMetadata that) {
 // compute the number of bytes between this offset to the given offset
 // if they are on the same segment and this offset precedes the given 
offset
 public int positionDiff(LogOffsetMetadata that) {
+if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || 
that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)
+return 1;

Review Comment:
   It's a bit hacky to do this here. I was thinking of doing this in 
DelayedFetch.
   
   ```
   if (endOffset.messageOffset != fetchOffset.messageOffset) {
 if (endOffset.messageOnly() || fetchOffset.messageOnly()) {
accumulatedSize += 1
 } else if (endOffset.onOlderSegment(fetchOffset)) {
   // Case F, this can happen when the new fetch operation is 
on a truncated leader
   ...
   }
   ```



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1428,7 +1428,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 */
   private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
 checkLogStartOffset(offset)
-localLog.convertToOffsetMetadataOrThrow(offset)
+if (remoteLogEnabled() && offset < localLogStartOffset()) {
+  new LogOffsetMetadata(offset, 
LogOffsetMetadata.REMOTE_LOG_UNKNOWN_OFFSET)
+} else {
+  localLog.convertToOffsetMetadataOrThrow(offset)

Review Comment:
   I was thinking that we could change 
localLog.convertToOffsetMetadataOrThrow() such that if read() throws an 
exception, it just returns a message-only LogOffsetMetadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-01 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591019


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String 
connName,
 ConnectorClientConfigRequest connectorClientConfigRequest = new 
ConnectorClientConfigRequest(
 connName, connectorType, connectorClass, clientConfigs, 
clientType);
 List configValues = 
connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
-if (configValues != null) {
-for (ConfigValue validatedConfigValue : configValues) {
-ConfigKey configKey = 
configKeys.get(validatedConfigValue.name());
-ConfigKeyInfo configKeyInfo = null;
-if (configKey != null) {
-if (configKey.group != null) {
-groups.add(configKey.group);
-}
-configKeyInfo = convertConfigKey(configKey, prefix);
-}
 
-ConfigValue configValue = new ConfigValue(prefix + 
validatedConfigValue.name(), validatedConfigValue.value(),
-  
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
-if (!configValue.errorMessages().isEmpty()) {
-errorCount++;
+return prefixedConfigInfos(configDef.configKeys(), configValues, 
prefix);
+}
+
+private static ConfigInfos prefixedConfigInfos(Map 
configKeys, List configValues, String prefix) {
+int errorCount = 0;
+Set groups = new LinkedHashSet<>();
+List configInfos = new ArrayList<>();
+
+if (configValues == null) {

Review Comment:
   `ConfigDef::validate` is non-final, and plugin instances may return a 
subclass from their `config` methods that possibly returns null.
   
   I acknowledge that this is extremely unlikely, but it seems like this null 
guard is the best way to handle that scenario as opposed to, e.g., throwing an 
error and causing a 500 response to be returned. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586828276


##
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##
@@ -206,8 +151,16 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
-public static Map parseVoterConnections(List 
voterEntries) {
-Map voterMap = new HashMap<>();
+public static Map 
parseVoterConnections(List voterEntries) {
+return parseVoterConnections(voterEntries, true);
+}
+
+public static Set parseVoterIds(List voterEntries) {
+return parseVoterConnections(voterEntries, false).keySet();
+}
+
+private static Map 
parseVoterConnections(List voterEntries, boolean routableOnly) {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586825107


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -181,20 +181,12 @@ class KafkaRaftManager[T](
   private val clientDriver = new KafkaRaftClientDriver[T](client, 
threadNamePrefix, fatalFaultHandler, logContext)
 
   def startup(): Unit = {
-// Update the voter endpoints (if valid) with what's in RaftConfig
-val voterAddresses: util.Map[Integer, AddressSpec] = 
controllerQuorumVotersFuture.get()
-for (voterAddressEntry <- voterAddresses.entrySet.asScala) {
-  voterAddressEntry.getValue match {
-case spec: InetAddressSpec =>
-  netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
-case _: UnknownAddressSpec =>
-  info(s"Skipping channel update for destination ID: 
${voterAddressEntry.getKey} " +
-s"because of non-routable endpoint: 
${NON_ROUTABLE_ADDRESS.toString}")
-case invalid: AddressSpec =>
-  warn(s"Unexpected address spec (type: ${invalid.getClass}) for 
channel update for " +
-s"destination ID: ${voterAddressEntry.getKey}")
-  }
-}
+client.initialize(
+  controllerQuorumVotersFuture.get(),
+  config.controllerListenerNames.head,
+  new FileBasedStateStore(new File(dataDir, "quorum-state")),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586802072


##
raft/src/main/java/org/apache/kafka/raft/internals/History.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A object tracks values of {@code T} at different offsets.
+ */
+public interface History {

Review Comment:
   Okay. Went with `LogHistory`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2089079055

   > > > Here I have a comment, I could not put at the right location in the 
code:
   > > > On line 1362, in commitSync() the consumer waits on the commitFuture 
with a timer. I think, it should not wait on a timer there since we already 
wait on a timer in the background thread.
   > > 
   > > 
   > > I agree. What about the timed wait in 
awaitPendingAsyncCommitsAndExecuteCommitCallbacks()?
   > 
   > Agree we should not wait on the `commitFuture` with a timer because the 
deadline is contained in the event we submitted, and already enforced by the 
reaper, and not clear about what the proposed relationship with 
`awaitPendingAsyncCommitsAndExecuteCommitCallbacks` is??
   > 
   > I would expect we only need to call 
`ConsumerUtils.getResult(commitFuture);`, and that is consistent with how we 
get results for all other completable events now:
   > 
   > * we create an event with a deadline
   > * we call `applicationEventHandler.addAndGet(event)`
   > 
   > For the commit case that flow has a different shape just because we use 
`applicationEventHandler.add(event)` 
[here](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L775),
 to cater for commit sync and async, but we should still apply the same 
approach and just call get without any time boundary I would say.
   
   Here's my reasoning on the need for a `Timer`-based `get()` in 
`awaitPendingAsyncCommitsAndExecuteCommitCallbacks()`... 
   
   The `Future` that's referenced in `lastPendingAsyncCommit` comes from an 
`AsyncCommitEvent` and has a hard-coded deadline of `Long.MAX_VALUE`. As such, 
the `CompetableEventReaper` in the network thread will never prune that event. 
Without a timeout when calling `get()` on the `lastPendingAsyncCommit`, the 
caller could hang for up to `request.timeout.ms` while we wait for the network 
I/O request to complete (or timeout).
   
   @cadonna @lianetm @lucasbru—does that make sense? CMIIW, please 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586782977


##
raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+final public class VoterSetHistoryTest {

Review Comment:
   Yes. Good catch. Added two more tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586765460


##
raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java:
##
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Arrays;
+import java.util.Optional;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.MockLog;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RecordsSnapshotWriter;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+final class KRaftControlRecordStateMachineTest {
+private static final RecordSerde STRING_SERDE = new StringSerde();
+
+private static MockLog buildLog() {
+return new MockLog(new TopicPartition("partition", 0), 
Uuid.randomUuid(), new LogContext());
+}
+
+private static KRaftControlRecordStateMachine 
buildPartitionListener(MockLog log, Optional staticVoterSet) {
+return new KRaftControlRecordStateMachine(
+staticVoterSet,
+log,
+STRING_SERDE,
+BufferSupplier.NO_CACHING,
+1024,
+new LogContext()
+);
+}
+
+@Test
+void testEmptyParition() {

Review Comment:
   Fxed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586760874


##
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java:
##
@@ -216,4 +140,132 @@ private void appendBatches(List> 
batches) {
 batches.forEach(CompletedBatch::release);
 }
 }
+
+final public static class Builder {
+private long lastContainedLogTimestamp = 0;
+private CompressionType compressionType = CompressionType.NONE;
+private Time time = Time.SYSTEM;
+private int maxBatchSize = 1024;
+private MemoryPool memoryPool = MemoryPool.NONE;
+private short kraftVersion = 0;

Review Comment:
   In practice `KafkaRaftClient` will override all of these defaults. I set the 
default to what would make sense to use in most cases in `**/src/test`. This is 
why `maxBatchSize` is 1024 for example.
   
   We use a similar strategy in the `metadata` module. The builder's default is 
what would make sense in most tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-05-01 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842765#comment-17842765
 ] 

Lianet Magrans commented on KAFKA-16514:


Just to answer the question above:
{quote}what is the purpose of -2 code? In the end, not sending any request, 
with a large enough session timeout, no rebalance would be triggered anyway? 
What does change is we send -2 instead of just not sending any leaver group 
request on close()?{quote}

The purpose is just to set the intention explicitly at the protocol level (and 
not assume it). This is mainly to allow for richer semantics around the static 
membership leave in the future. It does not make a difference at the moment 
(over not sending the leave group), but it does allow to cleanly extend the 
current logic if we ever want to, and allow static members to leave permanently 
by sending a -1 epoch on the leave group. That would effectively allow to 
remove a static members from a group (which can only be achieved now either 
waiting for the session timeout to expire, or via the admin api)

Anyways, that's just food for thought for now. The KIP extending the consumer 
close with options seems sensible to solve the current situation, and would 
align nicely with any future extension of the static leave semantics if we ever 
go down that path. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586731329


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;
+
+VoterSet(Map voters) {
+if (voters.isEmpty()) {
+throw new IllegalArgumentException("Voters cannot be empty");
+}
+
+this.voters = voters;
+}
+
+/**
+ * Returns the socket address for a given voter at a given listener.
+ *
+ * @param voter the id of the voter
+ * @param listener the name of the listener
+ * @return the socket address if it exist, otherwise {@code 
Optional.empty()}
+ */
+public Optional voterAddress(int voter, String 
listener) {
+return Optional.ofNullable(voters.get(voter))
+.flatMap(voterNode -> voterNode.address(listener));
+}
+
+/**
+ * Returns all of the voter ids.
+ */
+public Set voterIds() {
+return voters.keySet();
+}
+
+/**
+ * Adds a voter to the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
added.
+ *
+ * A new voter can be added to a voter set if its id doesn't already exist 
in the voter set.
+ *
+ * @param voter the new voter to add
+ * @return a new voter set if the voter was added, otherwise {@code 
Optional.empty()}
+ */
+public Optional addVoter(VoterNode voter) {
+if (voters.containsKey(voter.id())) {
+return Optional.empty();
+}
+
+HashMap newVoters = new HashMap<>(voters);
+newVoters.put(voter.id(), voter);
+
+return Optional.of(new VoterSet(newVoters));
+}
+
+/**
+ * Removew a voter from the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
removed.
+ *
+ * A voter can be removed from the voter set if its id and uuid match.
+ *
+ * @param voterId the voter id

Review Comment:
   Okay. I'll use `VoterKey`. I was conflicted about using `VoterKey` instead 
of `int , Optional` through out the code but probably using `VoterKey` as 
much as possible is the right decision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586632167


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1169,8 +1129,7 @@ private Map 
beginningOrEndOffset(Collection 
offsetAndTimestampMap;
 offsetAndTimestampMap = applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
+listOffsetsEvent);

Review Comment:
   Moved the `listOffsetsEvent` up to the previous line. Missed it on first 
read, sorry 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586626516


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -987,6 +987,7 @@ public void testResetUsingAutoResetPolicy(GroupProtocol 
groupProtocol) {
 @ParameterizedTest
 @EnumSource(GroupProtocol.class)
 public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) {
+Time time = new MockTime(1);

Review Comment:
   Done.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CompletableEventReaperTest {
+
+private final LogContext logContext = new LogContext();
+private final Time time = new MockTime(0, 0, 0);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586622390


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1273,6 +1228,22 @@ private void close(Duration timeout, boolean 
swallowException) {
 if (applicationEventHandler != null)
 closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
 closeTimer.update();
+
+// close() can be called from inside one of the constructors. In that 
case, it's possible that neither
+// the reaper nor the background event queue were constructed, so 
check them first to avoid NPE.
+if (backgroundEventReaper != null && backgroundEventQueue != null) {
+// Copy over the completable events to a separate list, then reap 
any incomplete
+// events on that list.
+LinkedList allEvents = new LinkedList<>();

Review Comment:
   Yes, because it has the `drainTo()` method. However, this code is now gone, 
so it's moot 路‍♂️



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586620323


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java:
##
@@ -16,9 +16,118 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+import java.time.Duration;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Objects.requireNonNull;
 
+/**
+ * {@code CompletableEvent} is an interface that is used by both {@link 
CompletableApplicationEvent} and
+ * {@link CompletableBackgroundEvent} for common processing and logic. A 
{@code CompletableEvent} is one that
+ * allows the caller to get the {@link #future() future} related to the event 
and the event's
+ * {@link #deadlineMs() expiration timestamp}.
+ *
+ * @param  Return type for the event when completed
+ */
 public interface CompletableEvent {
 
+/**
+ * Returns the {@link CompletableFuture future} associated with this 
event. Any event will have some related
+ * logic that is executed on its behalf. The event can complete in one of 
the following ways:
+ *
+ * 
+ * 
+ * Success: when the logic for the event completes successfully, 
the data generated by that event
+ * (if applicable) is passed to {@link 
CompletableFuture#complete(Object)}. In the case where the generic
+ * bound type is specified as {@link Void}, {@code null} is 
provided.
+ * 
+ * Error: when the the event logic generates an error, the error 
is passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * Timeout: when the time spent executing the event logic exceeds 
the {@link #deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} should be created and 
passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * Cancelled: when an event remains incomplete when the consumer 
closes, the future will be
+ * {@link CompletableFuture#cancel(boolean) cancelled}. Attempts 
to {@link Future#get() get the result}

Review Comment:
   Good catch!
   
   I had been using `cancel()`, but noticed that the message in the exception 
the caller of `Future.get()` later received was unhelpful. Yes, `cancel()` 
calls `completeExceptionally(new CancellationException())`, but I wanted the 
exception to include a (hopefully) meaningful message.
   
   Anyhoo... I've updated the documentation to reflect that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586620323


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java:
##
@@ -16,9 +16,118 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+import java.time.Duration;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Objects.requireNonNull;
 
+/**
+ * {@code CompletableEvent} is an interface that is used by both {@link 
CompletableApplicationEvent} and
+ * {@link CompletableBackgroundEvent} for common processing and logic. A 
{@code CompletableEvent} is one that
+ * allows the caller to get the {@link #future() future} related to the event 
and the event's
+ * {@link #deadlineMs() expiration timestamp}.
+ *
+ * @param  Return type for the event when completed
+ */
 public interface CompletableEvent {
 
+/**
+ * Returns the {@link CompletableFuture future} associated with this 
event. Any event will have some related
+ * logic that is executed on its behalf. The event can complete in one of 
the following ways:
+ *
+ * 
+ * 
+ * Success: when the logic for the event completes successfully, 
the data generated by that event
+ * (if applicable) is passed to {@link 
CompletableFuture#complete(Object)}. In the case where the generic
+ * bound type is specified as {@link Void}, {@code null} is 
provided.
+ * 
+ * Error: when the the event logic generates an error, the error 
is passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * Timeout: when the time spent executing the event logic exceeds 
the {@link #deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} should be created and 
passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * Cancelled: when an event remains incomplete when the consumer 
closes, the future will be
+ * {@link CompletableFuture#cancel(boolean) cancelled}. Attempts 
to {@link Future#get() get the result}

Review Comment:
   Good catch!
   
   I had been using `cancel()`, but noticed that the message in the exception 
the caller of `Future.get()` later received was unhelpful. Yes, `cancel()` 
calls `completeExceptionally(new CancellationException())`, but I wanted the 
exception with a (hopefully) meaningful message.
   
   Anyhoo... I've updated the documentation to reflect that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586607598


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java:
##
@@ -30,12 +29,7 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent {
  */
 private final Map offsets;
 
-protected CommitEvent(final Type type, final Map offsets, final Timer timer) {
-super(type, timer);
-this.offsets = validate(offsets);
-}
-
-protected CommitEvent(final Type type, final Map offsets, final long deadlineMs) {
+public CommitEvent(final Type type, Map 
offsets, final long deadlineMs) {

Review Comment:
   Added back `final` and changed back to `protected`. Not sure how/why I 
changed those 樂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586603709


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +310,18 @@ void cleanup() {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
 sendUnsentRequests(timer);
+
+LinkedList allEvents = new LinkedList<>();
+applicationEventQueue.drainTo(allEvents);
+List> completableEvents = allEvents
+.stream()
+.filter(e -> e instanceof CompletableApplicationEvent)
+.map(e -> (CompletableApplicationEvent) e)
+.collect(Collectors.toList());

Review Comment:
   Pulled the logic to `reapIncomplete()` as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-01 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591574


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String 
connName,
 ConnectorClientConfigRequest connectorClientConfigRequest = new 
ConnectorClientConfigRequest(
 connName, connectorType, connectorClass, clientConfigs, 
clientType);
 List configValues = 
connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
-if (configValues != null) {
-for (ConfigValue validatedConfigValue : configValues) {
-ConfigKey configKey = 
configKeys.get(validatedConfigValue.name());
-ConfigKeyInfo configKeyInfo = null;
-if (configKey != null) {
-if (configKey.group != null) {
-groups.add(configKey.group);
-}
-configKeyInfo = convertConfigKey(configKey, prefix);
-}
 
-ConfigValue configValue = new ConfigValue(prefix + 
validatedConfigValue.name(), validatedConfigValue.value(),
-  
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
-if (!configValue.errorMessages().isEmpty()) {
-errorCount++;
+return prefixedConfigInfos(configDef.configKeys(), configValues, 
prefix);
+}
+
+private static ConfigInfos prefixedConfigInfos(Map 
configKeys, List configValues, String prefix) {
+int errorCount = 0;
+Set groups = new LinkedHashSet<>();
+List configInfos = new ArrayList<>();
+
+if (configValues == null) {
+return new ConfigInfos("", 0, new ArrayList<>(groups), 
configInfos);
+}
+
+for (ConfigValue validatedConfigValue : configValues) {
+ConfigKey configKey = configKeys.get(validatedConfigValue.name());
+ConfigKeyInfo configKeyInfo = null;
+if (configKey != null) {
+if (configKey.group != null) {
+groups.add(configKey.group);
 }
-ConfigValueInfo configValueInfo = 
convertConfigValue(configValue, configKey != null ? configKey.type : null);
-configInfoList.add(new ConfigInfo(configKeyInfo, 
configValueInfo));
+configKeyInfo = convertConfigKey(configKey, prefix);
+}
+
+ConfigValue configValue = new ConfigValue(prefix + 
validatedConfigValue.name(), validatedConfigValue.value(),
+validatedConfigValue.recommendedValues(), 
validatedConfigValue.errorMessages());
+if (configValue.errorMessages().size() > 0) {
+errorCount++;
 }
+ConfigValueInfo configValueInfo = convertConfigValue(configValue, 
configKey != null ? configKey.type : null);
+configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo));
 }
-return new ConfigInfos(connectorClass.toString(), errorCount, new 
ArrayList<>(groups), configInfoList);
+return new ConfigInfos("", errorCount, new ArrayList<>(groups), 
configInfos);

Review Comment:
   Yeah, it's a little weird with the empty string here. Hopefully it's fine 
for now but if we continue augmenting and refactoring this class I agree that 
it might be worth changing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-01 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591136


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| !pluginConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T pluginInstance;
+try {
+pluginInstance = Utils.newInstance(pluginClass, pluginInterface);

Review Comment:
   I think this is actually correct. All calls to `validateConverterConfig` 
take place within a `LoaderSwap` that causes the connector's classloader to be 
used, which unless I'm mistaken matches the behavior when instantiating tasks 
(loader swap 
[here](https://github.com/apache/kafka/blob/4825c89d14e5f1b2da7e1f48dac97888602028d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L654),
 converter instantiation 
[here](https://github.com/apache/kafka/blob/4825c89d14e5f1b2da7e1f48dac97888602028d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L666-L674)).
 It's true that `Plugins::newConverter` and `Plugins::newHeaderConverter` are 
used instead of `Utils::newInstance` when starting tasks, but when invoking the 
`Plugins` methods with `classLoaderUsage` set to `CURRENT_CLASSLOADER`, no 
classloader swapping takes place, so the connector loader is still used.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  

Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-01 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586590857


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -562,8 +709,13 @@ ConfigInfos validateConnectorConfig(
 configKeys.putAll(configDef.configKeys());
 allGroups.addAll(configDef.groups());
 configValues.addAll(config.configValues());
-ConfigInfos configInfos =  generateResult(connType, configKeys, 
configValues, new ArrayList<>(allGroups));
 
+// do custom converter-specific validation
+ConfigInfos headerConverterConfigInfos = 
validateHeaderConverterConfig(connectorProps, 
validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG));
+ConfigInfos keyConverterConfigInfos = 
validateKeyConverterConfig(connectorProps, 
validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG));
+ConfigInfos valueConverterConfigInfos = 
validateValueConverterConfig(connectorProps, 
validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG));

Review Comment:
   Yeah, the alternative was to pass in the entire `validatedConnectorConfig` 
and let the various `validateXxxConverterConfig` methods pull out the relevant 
`ConfigValue` field. But this seemed strange considering those methods only 
require a single `ConfigValue` object.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| !pluginConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T pluginInstance;
+try {
+pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load class " + 

[jira] [Commented] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”

2024-05-01 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842730#comment-17842730
 ] 

Philip Nee commented on KAFKA-16022:


hi [~phuctran] - I believe this came up during integration testing.  You can 
try to see if the fetch request manager test also emits this error.

> AsyncKafkaConsumer sometimes complains “No current assignment for partition 
> {}”
> ---
>
> Key: KAFKA-16022
> URL: https://issues.apache.org/jira/browse/KAFKA-16022
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> This seems to be a timing issue that before the member receives any 
> assignment from the coordinator, the fetcher will try to find the current 
> position causing "No current assignment for partition {}".  This creates a 
> small amount of noise to the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586559943


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +310,18 @@ void cleanup() {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
 sendUnsentRequests(timer);
+
+LinkedList allEvents = new LinkedList<>();
+applicationEventQueue.drainTo(allEvents);
+List> completableEvents = allEvents
+.stream()
+.filter(e -> e instanceof CompletableApplicationEvent)
+.map(e -> (CompletableApplicationEvent) e)
+.collect(Collectors.toList());

Review Comment:
    Yes, I went back and forth on this at least three times during 
development. I'll look at switching back to the approach you suggest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586557142


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection 
topics, Optional T processBackgroundEvents(EventProcessor eventProcessor,
+ T processBackgroundEvents(EventProcessor 
eventProcessor,

Review Comment:
   I made the documentation changes you requested and removed the logging to 
make the logic simpler.
   
   When you state that it seems like "overkill to have all this code for 
something we don't need now, or know if we we'll need some day)," I'm a bit 
confused 樂 Because unsubscribing may require invoking 
`ConsumerRebalanceListener` callbacks, we need a way to check and run those 
events that are coming from the background thread, right.
   
   I do agree that it's overkill to have this broken out as a separate method 
since it's only used for the `unsubscribe()` case. IIRC, there was some talk of 
another use case for this, and it does make unit testing it easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586557142


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection 
topics, Optional T processBackgroundEvents(EventProcessor eventProcessor,
+ T processBackgroundEvents(EventProcessor 
eventProcessor,

Review Comment:
   I made the documentation changes you requested and removed the logging to 
make the logic simpler.
   
   When you state that it seems like "overkill to have all this code for 
something we don't need now, or know if we we'll need some day," I'm a bit 
confused 樂 Because unsubscribing may require invoking 
`ConsumerRebalanceListener` callbacks, we need a way to check and run those 
events that are coming from the background thread, right.
   
   I do agree that it's overkill to have this broken out as a separate method 
since it's only used for the `unsubscribe()` case. IIRC, there was some talk of 
another use case for this, and it does make unit testing it easier.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection 
topics, Optional T processBackgroundEvents(EventProcessor eventProcessor,
+ T processBackgroundEvents(EventProcessor 
eventProcessor,

Review Comment:
   I made the documentation changes you requested and removed the logging to 
make the logic simpler.
   
   When you state that it seems like "overkill to have all this code for 
something we don't need now, or know if we we'll need some day," I'm a bit 
confused 樂 Because unsubscribing may require invoking 
`ConsumerRebalanceListener` callbacks, we need a way to check and run those 
events that are coming from the background thread, right?
   
   I do agree that it's overkill to have this broken out as a separate method 
since it's only used for the `unsubscribe()` case. IIRC, there was some talk of 
another use case for this, and it does make unit testing it easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]

2024-05-01 Thread via GitHub


chia7712 commented on PR #15055:
URL: https://github.com/apache/kafka/pull/15055#issuecomment-2088823785

   > I have the correct file still saved on my new forced update but I can't 
link it to this issue. I don't want to revert because I accidently pushed all 
the commits from the past few months as part of this one.
   
   maybe you can file a new PR ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership

2024-05-01 Thread Alexander Aghili (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842727#comment-17842727
 ] 

Alexander Aghili commented on KAFKA-16027:
--

Hi [~johnnyhsu], I have added it the feature but messed up with some git 
things. See: https://github.com/apache/kafka/pull/15055
My fork on my Kafka-16027 branch has the new implementation so I don't think 
this is necessary to work on but if you want to help sort out the issues 
related to making a pull request please feel free.

> Refactor MetadataTest#testUpdatePartitionLeadership
> ---
>
> Key: KAFKA-16027
> URL: https://issues.apache.org/jira/browse/KAFKA-16027
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Philip Nee
>Assignee: Alexander Aghili
>Priority: Minor
>  Labels: newbie
>
> MetadataTest#testUpdatePartitionLeadership is extremely long.  I think it is 
> pretty close to the 160 line method limit - I tried to modfity it but it 
> would hit the limit when i tried to break things into separated lines.
> The test also contains two tests, so it is best to split it into two separate 
> tests.
> We should also move this to ConsumerMetadata.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586553978


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;
+
+VoterSet(Map voters) {
+if (voters.isEmpty()) {
+throw new IllegalArgumentException("Voters cannot be empty");
+}
+
+this.voters = voters;
+}
+
+/**
+ * Returns the socket address for a given voter at a given listener.
+ *
+ * @param voter the id of the voter
+ * @param listener the name of the listener
+ * @return the socket address if it exist, otherwise {@code 
Optional.empty()}
+ */
+public Optional voterAddress(int voter, String 
listener) {
+return Optional.ofNullable(voters.get(voter))
+.flatMap(voterNode -> voterNode.address(listener));
+}
+
+/**
+ * Returns all of the voter ids.
+ */
+public Set voterIds() {
+return voters.keySet();
+}
+
+/**
+ * Adds a voter to the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
added.
+ *
+ * A new voter can be added to a voter set if its id doesn't already exist 
in the voter set.
+ *
+ * @param voter the new voter to add
+ * @return a new voter set if the voter was added, otherwise {@code 
Optional.empty()}
+ */
+public Optional addVoter(VoterNode voter) {
+if (voters.containsKey(voter.id())) {
+return Optional.empty();

Review Comment:
   But we are checking the invariant here. Let me give an example. Let's say 
that the set of voters is `(0, DirId0), (1, DirId1), (2, DirId2)`.
   
   If `(2, DirId2')` (notice that the replica id is the same but the directory 
id is different) tries to join the set of voters, the leader will call 
`VoterSet::addVoter((2, DirId2'))`. This call will return `Optional.empty()`. 
Meaning that the voter was not added.
   
   I think I understand the source of the confusion. Maybe `addVoter` should 
return different values if `replica id` already exist vs `(replica id, replica 
directory id)` already exist. The KIP doesn't distinguish between these two 
cases:
   > DUPLICATE_VOTER - when the request contains a replica id is already in the 
committed set of voters. Note that this means that duplicate replica ids are 
not allowed. This is useful to make automatic voter addition safer.
   
   from [AddVoter 
handling](https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-Handling.3).
   
   Do you mind if we revisit this when we implement [AddVoter RPC and request 
handling](https://issues.apache.org/jira/browse/KAFKA-16535)? I mainly added 
this so that I can write some of the tests in `VoterSetTest`. This code is not 
used by `raft/src/main`.
   
   Note that to implement the `UpdateVoter` RPC I will probably add an 
`VoterSet::updateVoter` methods that will implement the invariants of that 
operations.
   > If the replica id tracked doesn't have a replica directory id, update it 
with the replica directory id provided in the request.
   
   from [UpdateVoter 

Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]

2024-05-01 Thread via GitHub


Alexander-Aghili commented on PR #15055:
URL: https://github.com/apache/kafka/pull/15055#issuecomment-2088813846

   I have the correct file still saved on my new forced update but I can't link 
it to this issue. I don't want to revert because I accidently pushed all the 
commits from the past few months as part of this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586542049


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection 
topics, Optional processor) {

Review Comment:
   I renamed `process()` as `processBackgroundEvents()`. Is that OK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586539952


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,
+ * making sure to reap them if they complete normally or pass their deadline. 
This is done so that we enforce an upper
+ * bound on the amount of time the event logic will execute.
+ */
+public class CompletableEventReaper> {
+
+private final Logger log;
+
+/**
+ * List of tracked events that we are candidates to expire or cancel when 
reviewed.

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586537358


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,

Review Comment:
   I reworked the comments/documentation to avoid that altogether. PTAL. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-05-01 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842724#comment-17842724
 ] 

Chia-Ping Tsai commented on KAFKA-16223:


{quote}
I have another set of tests already migrated and plan to open the second PR 
soon.
{quote}

this is great! Let's review/merge your PR first. And we will take over 
remaining if you have no enough time.

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586526016


##
raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java:
##
@@ -261,7 +260,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
  * @param snapshotId the end offset and epoch that identifies the snapshot
  * @return a writable snapshot if it doesn't already exist
  */
-Optional storeSnapshot(OffsetAndEpoch snapshotId);
+Optional createNewSnapshotUnchecked(OffsetAndEpoch 
snapshotId);

Review Comment:
   So the use cases are very different hence the reason why I separated the 
methods.
   
   `createNewSnapshot` is indirectly called by the KRaft state machine. In this 
case the `ReplicatedLog` makes sure that the provided snapshot id (offset and 
epoch) is consistent with the log.
   
   `createNewSnapshotUnchecked` is used by the local replica to override the 
entire log with the leader's snapshot. For example, right after downloading the 
snapshot we fully truncate the log: 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459-L1473
   
   This part of the code calls `truncateToLatestSnapshot` which deletes all of 
the old snapshots and replaces the log with an empty log that start at the just 
downloaded snapshot (the latest snapshot). In other words 
`truncateToLatestSnapshot` makes the `ReplicadLog` consistent after the direct 
use of `createNewSnapshotUnchecked`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586524076


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,

Review Comment:
   Unfortunately, the term "processed" is sufficiently ambiguous  So we're 
_both_ right 
   
   Here, I'm referring to events that had been passed to the `EventProcessor`'s 
[process()](https://github.com/apache/kafka/blob/74a7ed78cc69f0d28bd18139b90c28468058e111/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java#L40)
 method.
   
   Which, sadly, isn't even correct, because they're being `add()`-ed to the 
reaper _before_ they're passed to `EventProcessor.process()` 臘‍♂️



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [WIP] KAFKA-16027: refactor MetadataTest [kafka]

2024-05-01 Thread via GitHub


johnnychhsu opened a new pull request, #15842:
URL: https://github.com/apache/kafka/pull/15842

   Jira: https://issues.apache.org/jira/browse/KAFKA-16027
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) [kafka]

2024-05-01 Thread via GitHub


hgeraldino opened a new pull request, #15841:
URL: https://github.com/apache/kafka/pull/15841

   This is the last remaining Kafka Connect test that needs migration from 
PowerMock/EasyMock to Mockito.
   
   Previous PR: https://github.com/apache/kafka/pull/15520
   
   As usual, I Iook forward for your comments and feedback @C0urante 
@gharris1727 @clolov @mukkachaitanya @chia7712 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15309: Add custom error handler to Producer [kafka]

2024-05-01 Thread via GitHub


aliehsaeedii closed pull request #15731: KAFKA-15309: Add custom error handler 
to Producer
URL: https://github.com/apache/kafka/pull/15731


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-05-01 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586499310


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+/**
+ * The {@code CompletableEventReaper} is responsible for tracking any {@link 
CompletableEvent}s that were processed,
+ * making sure to reap them if they complete normally or pass their deadline. 
This is done so that we enforce an upper
+ * bound on the amount of time the event logic will execute.
+ */
+public class CompletableEventReaper> {
+
+private final Logger log;
+
+/**
+ * List of tracked events that we are candidates to expire or cancel when 
reviewed.
+ */
+private final List tracked;
+
+public CompletableEventReaper(LogContext logContext) {
+this.log = logContext.logger(CompletableEventReaper.class);
+this.tracked = new ArrayList<>();
+}
+
+/**
+ * Adds a new {@link CompletableEvent event} to track for later 
completion/expiration.
+ *
+ * @param event Event to track
+ */
+public void add(T event) {
+tracked.add(Objects.requireNonNull(event, "Event to track must be 
non-null"));
+}
+
+/**
+ * This method "completes" any {@link CompletableEvent}s that have either 
expired or completed normally. So this
+ * is a two-step process:
+ *
+ * 
+ * 
+ * For each tracked event which has exceeded its {@link 
CompletableEvent#deadlineMs() deadline}, an
+ * instance of {@link TimeoutException} is created and passed to
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ * 
+ * 
+ * For each tracked event of which its {@link 
CompletableEvent#future() future} is already in the
+ * {@link CompletableFuture#isDone() done} state, it will be 
removed from the list of tracked events.
+ * 
+ * 
+ *
+ * 
+ *
+ * This method should be called at regular intervals, based upon the needs 
of the resource that owns the reaper.
+ *
+ * @param currentTimeMs Current time with which to compare 
against the
+ *  {@link CompletableEvent#deadlineMs() 
expiration time}
+ */
+public void reapExpiredAndCompleted(long currentTimeMs) {
+log.trace("Reaping expired events");
+
+Consumer> timeoutEvent = e -> {
+TimeoutException error = new TimeoutException(String.format("%s 
could not be completed within its timeout", e.getClass().getSimpleName()));
+long pastDueMs = currentTimeMs - e.deadlineMs();
+log.debug("Completing event {} exceptionally since it expired {} 
ms ago", e, pastDueMs);
+CompletableFuture f = e.future();
+f.completeExceptionally(error);
+};
+
+// First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
+tracked.stream()
+.filter(e -> !e.future().isDone())
+.filter(e -> currentTimeMs > e.deadlineMs())
+.forEach(timeoutEvent);
+// Second, remove any events that are already complete, just to make 
sure we don't hold references. This will
+// include any events that finished successfully as well as any events 
we just completed exceptionally above.
+tracked.removeIf(e -> e.future().isDone());

Review Comment:
   My first attempt at this resulted in a `ConcurrentModificationException`, 
since we're removing each entry from the very same list we're 

[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-05-01 Thread Hector Geraldino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842717#comment-17842717
 ] 

Hector Geraldino commented on KAFKA-16223:
--

Thanks [~chia7712] [~cmukka20] for following up on this.

As you've already noticed, the strategy I took was to create a separate 
[KafkaConfigBackingStoreMockitoTest|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java]
 and migrate the tests in batches to facilitate reviews. My original plan was 
to do this in 3 separate batches, the [first 
batch|https://github.com/apache/kafka/pull/15520] was merged a few weeks ago, 
and I have another set of tests already migrated and plan to open the second PR 
soon.

If you guys want to pick up the remaining that's fine by me. I haven't had 
enough time these past weeks to work on this, but I'd love to see this 
completed (and the migration to JUnit5 started) before 3.8

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16652) add unit test for ClusterTemplate offering zero ClusterConfig

2024-05-01 Thread TaiJuWu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TaiJuWu reassigned KAFKA-16652:
---

Assignee: TaiJuWu  (was: Chia-Ping Tsai)

> add unit test for ClusterTemplate offering zero ClusterConfig
> -
>
> Key: KAFKA-16652
> URL: https://issues.apache.org/jira/browse/KAFKA-16652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
>
> https://github.com/apache/kafka/blob/31355ef8f948f369e240ebc203f889f187116d75/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java#L94
> If `ClusterTemplate`does not generate any `ClusterConfig`, we will throw 
> exception. However, we don't have UT for such scenario currently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-01 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1586489784


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##
@@ -807,4 +809,62 @@ private static void writeSnapshotFooterRecord(
 builder.appendSnapshotFooterMessage(timestamp, 
snapshotFooterRecord);
 }
 }
+
+public static MemoryRecords withKRaftVersionRecord(
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+ByteBuffer buffer,
+KRaftVersionRecord kraftVersionRecord
+) {
+writeKRaftVersionRecord(buffer, initialOffset, timestamp, leaderEpoch, 
kraftVersionRecord);
+buffer.flip();
+return MemoryRecords.readableRecords(buffer);
+}
+
+private static void writeKRaftVersionRecord(
+ByteBuffer buffer,
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+KRaftVersionRecord kraftVersionRecord
+) {
+try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+TimestampType.CREATE_TIME, initialOffset, timestamp,
+RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+false, true, leaderEpoch, buffer.capacity())
+) {
+builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
+}
+}
+
+public static MemoryRecords withVotersRecord(
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+ByteBuffer buffer,
+VotersRecord votersRecord
+) {
+writeVotersRecord(buffer, initialOffset, timestamp, leaderEpoch, 
votersRecord);

Review Comment:
   Yeah. I see that. Looks like this is an existing issue with existing control 
record builders. Let me fix the ones that are specific for KRaft. We can fix 
the other ones in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]

2024-05-01 Thread via GitHub


TaiJuWu commented on code in PR #15800:
URL: https://github.com/apache/kafka/pull/15800#discussion_r1586484285


##
core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.test.annotation.ClusterTemplate;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import java.util.function.Consumer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ClusterTestExtensionsUnitTest {
+@Test
+@SuppressWarnings("unchecked")
+void testProcessClusterTemplate() {
+ClusterTestExtensions ext = new ClusterTestExtensions();
+ExtensionContext context = mock(ExtensionContext.class);
+Consumer testInvocations = 
mock(Consumer.class);
+ClusterTemplate annot = mock(ClusterTemplate.class);
+when(annot.value()).thenReturn("");

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-05-01 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16272:
--
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-05-01 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16272:
--
Priority: Major  (was: Blocker)

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-05-01 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, regex
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >