Re: [PR] KAFKA-16014: add RemoteLogMetadataCount metric [kafka]

2023-12-15 Thread via GitHub


showuon commented on PR #15026:
URL: https://github.com/apache/kafka/pull/15026#issuecomment-1858739478

   @clolov @satishd , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16014: add RemoteLogMetadataCount metric [kafka]

2023-12-15 Thread via GitHub


showuon opened a new pull request, #15026:
URL: https://github.com/apache/kafka/pull/15026

   Add `RemoteLogMetadataCount` count metric as described in KIP-963.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-15 Thread via GitHub


showuon commented on PR #15005:
URL: https://github.com/apache/kafka/pull/15005#issuecomment-1858738944

   > The following metrics are related to each other:
   > 
   > 1. RemoteCopyLagBytes and RemoteCopyLagSegments
   > 
   > 2. RemoteDeleteLagBytes and RemoteDeleteLagSegments
   > 
   > 
   > We should update their values in-sync by taking a lock. Now, both the 
values are being updated individually. There will be a deviation when the 
metric-reporter-thread reports them in the background.
   > 
   > (eg) RemoteCopyLagBytes can be shown as 0 but RemoteCopyLagSegments can be 
shown as 1 in the Grafana dashboard. Shall we fix the discrepancy?
   
   This is a good point, but I think adding a lock is overkilling compared with 
the value discrepancy. I think the value deviation should be acceptable 
compared with sacrificing the performance using lock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-15 Thread via GitHub


philipnee commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1428703397


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1071,18 +1071,25 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
 public void testUpdatePartitionLeadership() {
 Time time = new MockTime();

Review Comment:
   We probably need to refactor this test a certain point.  I tried to do it 
but as I moved something, we immediately exceeded the 160 line limit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16023: Disable flaky tests [kafka]

2023-12-15 Thread via GitHub


philipnee opened a new pull request, #15025:
URL: https://github.com/apache/kafka/pull/15025

   I observed several failed tests in PR builds.  Let's first disable them and 
try to find a different way to test the async consumer with these tests.
   
   cc @dajac


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding

2023-12-15 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16023:
---
Description: 
Several tests in PlaintextConsumerTest.scala (such as 
testPerPartitionLagMetricsCleanUpWithSubscribe) uses:

assertEquals(1, listener.callsToAssigned, "should be assigned once")

However, as the timing for reconciliation completion is not deterministic due 
to asynchronous processing. We actually need to wait until the condition to 
happen.

However, another issue is the timeout - some of these tasks might not complete 
within the 600ms timeout, so the tests are deemed to be flaky.

  was:
Several tests in PlaintextConsumerTest.scala (such as 
testPerPartitionLagMetricsCleanUpWithSubscribe) uses:

assertEquals(1, listener.callsToAssigned, "should be assigned once")

However, as the timing for reconciliation completion is not deterministic due 
to asynchronous processing. We actually need to wait until the condition to 
happen.


> PlaintextConsumerTest needs to wait for reconciliation to complete before 
> proceeding
> 
>
> Key: KAFKA-16023
> URL: https://issues.apache.org/jira/browse/KAFKA-16023
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Priority: Major
>
> Several tests in PlaintextConsumerTest.scala (such as 
> testPerPartitionLagMetricsCleanUpWithSubscribe) uses:
> assertEquals(1, listener.callsToAssigned, "should be assigned once")
> However, as the timing for reconciliation completion is not deterministic due 
> to asynchronous processing. We actually need to wait until the condition to 
> happen.
> However, another issue is the timeout - some of these tasks might not 
> complete within the 600ms timeout, so the tests are deemed to be flaky.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding

2023-12-15 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16023:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Bug)

> PlaintextConsumerTest needs to wait for reconciliation to complete before 
> proceeding
> 
>
> Key: KAFKA-16023
> URL: https://issues.apache.org/jira/browse/KAFKA-16023
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Priority: Major
>
> Several tests in PlaintextConsumerTest.scala (such as 
> testPerPartitionLagMetricsCleanUpWithSubscribe) uses:
> assertEquals(1, listener.callsToAssigned, "should be assigned once")
> However, as the timing for reconciliation completion is not deterministic due 
> to asynchronous processing. We actually need to wait until the condition to 
> happen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding

2023-12-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16023:
--

 Summary: PlaintextConsumerTest needs to wait for reconciliation to 
complete before proceeding
 Key: KAFKA-16023
 URL: https://issues.apache.org/jira/browse/KAFKA-16023
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


Several tests in PlaintextConsumerTest.scala (such as 
testPerPartitionLagMetricsCleanUpWithSubscribe) uses:

assertEquals(1, listener.callsToAssigned, "should be assigned once")

However, as the timing for reconciliation completion is not deterministic due 
to asynchronous processing. We actually need to wait until the condition to 
happen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16022) AsyncKafkaConsumer sometimes complains "No current assignment for partition {}"

2023-12-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16022:
--

 Summary: AsyncKafkaConsumer sometimes complains "No current 
assignment for partition {}"
 Key: KAFKA-16022
 URL: https://issues.apache.org/jira/browse/KAFKA-16022
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


This seems to be a timing issue that before the member receives any assignment 
from the coordinator, the fetcher will try to find the current position causing 
"No current assignment for partition {}".  This creates a small amount of noise 
to the log.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16022) AsyncKafkaConsumer sometimes complains "No current assignment for partition {}"

2023-12-15 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16022:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Bug)

> AsyncKafkaConsumer sometimes complains "No current assignment for partition 
> {}"
> ---
>
> Key: KAFKA-16022
> URL: https://issues.apache.org/jira/browse/KAFKA-16022
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Priority: Minor
>
> This seems to be a timing issue that before the member receives any 
> assignment from the coordinator, the fetcher will try to find the current 
> position causing "No current assignment for partition {}".  This creates a 
> small amount of noise to the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


kamalcph commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1428646042


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -926,16 +926,27 @@ private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earl
 return isSegmentDeleted;
 }
 
-private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+private boolean deleteRemoteLogSegment(String topic, 
RemoteLogSegmentMetadata segmentMetadata, Predicate 
predicate)

Review Comment:
   Can we take the `topic` name from the segmentMetadata instead of propagating 
it from the caller?
   
   ```
   String topic = segmentMetadata.topicIdPartition().topic();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-15 Thread via GitHub


kamalcph commented on code in PR #15005:
URL: https://github.com/apache/kafka/pull/15005#discussion_r1428638499


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1073,13 +1088,28 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
 .iterator();
 while (epochsToClean.hasNext()) {
 int epoch = epochsToClean.next();
+List listOfSegmentsToBeCleaned = 
new ArrayList<>();
 Iterator segmentsToBeCleaned = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
 while (segmentsToBeCleaned.hasNext()) {
 if (isCancelled() || !isLeader()) {
 return;
+} else {
+RemoteLogSegmentMetadata nextSegmentMetadata = 
segmentsToBeCleaned.next();
+sizeOfDeletableSegmentsBytes += 
nextSegmentMetadata.segmentSizeInBytes();
+listOfSegmentsToBeCleaned.add(nextSegmentMetadata);
 }
+}
+segmentsLeftToDelete += listOfSegmentsToBeCleaned.size();
+brokerTopicMetrics.recordRemoteDeleteBytesLag(partition, 
sizeOfDeletableSegmentsBytes);
+
brokerTopicMetrics.recordRemoteDeleteSegmentsLag(partition, 
segmentsLeftToDelete);
+for (RemoteLogSegmentMetadata segmentMetadata : 
listOfSegmentsToBeCleaned) {
 // No need to update the log-start-offset even though 
the segment is deleted as these epochs/offsets are earlier to that value.
-
remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
 segmentsToBeCleaned.next());
+if 
(remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
 segmentMetadata)) {

Review Comment:
   Why are we changing the deletion behaviour? When this replica is not the 
leader, then we want to skip the segment deletion.
   
   Should we have to record the `remoteDeleteLagBytes` and 
`remoteDeleteLagSegments` for `deletion-due-to-leader-epoch-cache-truncation` 
case? The deletion is happening in-sync and those metrics may not be relevant. 
   
   For the below cases, recording of the deletion-lag-metrics seem valid since 
there can be some delay b/w marking the eligible segments and actually deleting 
them:
   
   1. breachByLogStartOffset
   2. breachByRetentionTime and
   3. breachByRetentionSize



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -524,7 +567,7 @@ class BrokerTopicStats(configOpt: 
java.util.Optional[KafkaConfig] = java.util.Op
   
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName)
   
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
   
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
-  
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName)
+  
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)

Review Comment:
   Shall we close the `RemoteCopyLagSegments`, `RemoteDeleteLagBytes`, and 
`RemoteDeleteLagSegments` metrics here?



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -397,16 +400,56 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
   def invalidOffsetOrSequenceRecordsPerSec: Meter = 
metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
 
   def recordRemoteCopyBytesLag(partition: Int, bytesLag: Long): Unit = {

Review Comment:
   Subjective: Please consider renaming all the method/metric names to be 
consistent:
   
   ```
   MetricName -> MethodName
   RemoteCopyLagBytes -> recordRemoteCopyLagBytes()
   ```
   
   (or)
   
   ```
   MethodName -> MetricName
   recordRemoteCopyBytesLag() -> RemoteCopyBytesLag
   ```
   It brings clarity to the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-15 Thread via GitHub


kamalcph commented on PR #15005:
URL: https://github.com/apache/kafka/pull/15005#issuecomment-1858669208

   The following metrics are related to each other:
   
   1. RemoteCopyBytesLag and RemoteCopySegmentsLag
   2. RemoteDeleteBytesLag and RemoteDeleteSegmentsLag
   
   We should update their values in-sync by taking a lock. Now, both the values 
are being updated individually. There will be a deviation when the 
metric-reporter-thread reports them in the background.
   
   (eg) RemoteCopyBytesLag can be shown as 0 but RemoteCopySegmentsLag can be 
shown as 1 in the Grafana dashboard. Shall we fix the discrepancy?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16007 Merge batch records during ZK migration [kafka]

2023-12-15 Thread via GitHub


mumrah merged PR #15007:
URL: https://github.com/apache/kafka/pull/15007


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16007 Merge batch records during ZK migration [kafka]

2023-12-15 Thread via GitHub


mumrah commented on PR #15007:
URL: https://github.com/apache/kafka/pull/15007#issuecomment-1858638896

   Test failures look unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


showuon commented on PR #14375:
URL: https://github.com/apache/kafka/pull/14375#issuecomment-1858633479

   There is a compilation failure in java 8 scala 12. Please help fix it.
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14375/16/pipeline


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16021: Eagerly look up StringSerializer encoding during configure [kafka]

2023-12-15 Thread via GitHub


srdo commented on PR #15024:
URL: https://github.com/apache/kafka/pull/15024#issuecomment-1858616582

   I couldn't spot any testing that deals specifically with the behavior when 
the encoding can't be looked up. This change will alter that behavior so people 
will get a failure earlier during client initialization, instead of when 
messages are received/sent.
   
   I'm hoping such a change is acceptable (let me know if I need to add tests).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16021: Eagerly look up StringSerializer encoding during configure [kafka]

2023-12-15 Thread via GitHub


srdo opened a new pull request, #15024:
URL: https://github.com/apache/kafka/pull/15024

   The encoding for a StringSerializer doesn't change during the life of the 
serializer, so there isn't a good reason to look up the corresponding Charset 
every time a String needs to be serialized, which is what the getBytes(String) 
method does.
   
   Looking it up once on construction is more efficient.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16021) Eagerly load Charset for StringSerializer/StringDeserializer

2023-12-15 Thread Jira
Stig Rohde Døssing created KAFKA-16021:
--

 Summary: Eagerly load Charset for 
StringSerializer/StringDeserializer
 Key: KAFKA-16021
 URL: https://issues.apache.org/jira/browse/KAFKA-16021
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 3.6.1
Reporter: Stig Rohde Døssing
Assignee: Stig Rohde Døssing


StringSerializer and StringDeserializer currently use the String constructor 
and getBytes methods that take the encoding as a String, necessitating a 
Charset lookup every time those methods are called.

We could save that lookup by performing it once during the serializer configure 
call instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]

2023-12-15 Thread via GitHub


ableegoldman commented on code in PR #14949:
URL: https://github.com/apache/kafka/pull/14949#discussion_r1428577334


##
docs/streams/upgrade-guide.html:
##
@@ -185,6 +185,13 @@ Streams API
 
 
 
+
+default.dsl.store was replaced with 
dsl.store.suppliers.class to allow for custom store configuration.

Review Comment:
   ```suggestion
   The default.dsl.store config was deprecated in favor of 
the new
   dsl.store.suppliers.class config to allow for custom 
state store 
   implementations to be configured as the default.
   
   ```



##
docs/streams/developer-guide/config-streams.html:
##
@@ -310,6 +310,19 @@ num.standby.replicasDefault serializer/deserializer for the inner 
class of windowed values, implementing the Serde interface.
 null
   
+  default.dsl.store
+Low
+(Deprecated) The default state store type used by 
DSL operators.
+ROCKS_DB
+  
+  dsl.store.suppliers.class
+Low
+
+  Defines which store implementations to plug in to DSL operators. 
Must implement

Review Comment:
   ```suggestion
 Defines a default state store implementation to be used by any 
stateful DSL operator
 that has not explicitly configured the store implementation 
type. Must implement
   ```



##
docs/streams/developer-guide/config-streams.html:
##
@@ -310,6 +310,19 @@ num.standby.replicasDefault serializer/deserializer for the inner 
class of windowed values, implementing the Serde interface.
 null
   
+  default.dsl.store
+Low
+(Deprecated) The default state store type used by 
DSL operators.

Review Comment:
   ```suggestion
   
 [DEPRECATED] The default state store type used by DSL 
operators. Deprecated in
 favor of dsl.store.suppliers.class
 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-15 Thread via GitHub


philipnee opened a new pull request, #15023:
URL: https://github.com/apache/kafka/pull/15023

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15208: Upgrade Jackson dependencies to version 2.16.0 [kafka]

2023-12-15 Thread via GitHub


bmscomp commented on code in PR #13662:
URL: https://github.com/apache/kafka/pull/13662#discussion_r1428518449


##
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java:
##
@@ -143,7 +143,7 @@ public class JsonConverter implements Converter, 
HeaderConverter, Versioned {
 // names specified in the field
 private static final HashMap 
LOGICAL_CONVERTERS = new HashMap<>();
 
-private static final JsonNodeFactory JSON_NODE_FACTORY = 
JsonNodeFactory.withExactBigDecimals(true);
+private static final JsonNodeFactory JSON_NODE_FACTORY = new 
JsonNodeFactory(true);

Review Comment:
   @mimaison  Sorry I missed answering this point, the change is made because 
the static method is deprecated 
   
   ```
   /**
* Return a factory instance with the desired behavior for BigDecimals
* See {@link #JsonNodeFactory(boolean)} for a full description.
*
* @param bigDecimalExact If {code true} DISABLE normalization of {@link 
BigDecimal} values;
*if {code false} ENABLE normalization
* @return a factory instance with specified configuration
*
* @deprecated Use {@link 
com.fasterxml.jackson.databind.cfg.JsonNodeFeature#STRIP_TRAILING_BIGDECIMAL_ZEROES}
*   instead for configuring behavior.
*/
   @Deprecated
   public static JsonNodeFactory withExactBigDecimals(boolean 
bigDecimalExact)
   {
   return new JsonNodeFactory(bigDecimalExact);
   }

   ```
   and the best available replacement for it is the the constructor 
   
   ```
   /**
* @param bigDecimalExact see Class description on "BigDecimal 
normalization"
*
* @see BigDecimal
*/
   public JsonNodeFactory(boolean bigDecimalExact)
   {
   _cfgBigDecimalExact = bigDecimalExact;
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15208: Upgrade Jackson dependencies to version 2.16.0 [kafka]

2023-12-15 Thread via GitHub


bmscomp commented on code in PR #13662:
URL: https://github.com/apache/kafka/pull/13662#discussion_r1428518449


##
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java:
##
@@ -143,7 +143,7 @@ public class JsonConverter implements Converter, 
HeaderConverter, Versioned {
 // names specified in the field
 private static final HashMap 
LOGICAL_CONVERTERS = new HashMap<>();
 
-private static final JsonNodeFactory JSON_NODE_FACTORY = 
JsonNodeFactory.withExactBigDecimals(true);
+private static final JsonNodeFactory JSON_NODE_FACTORY = new 
JsonNodeFactory(true);

Review Comment:
   @mimaison  Sorry I missed answering this point, the change is made because 
the static method is deprecated 
   
   ```
   @Deprecated
   public static JsonNodeFactory withExactBigDecimals(boolean 
bigDecimalExact) {
   return new JsonNodeFactory(bigDecimalExact);
   }

   ```
   and the best available replacement for it is the the constructor 
   
   ```
   /**
* Return a factory instance with the desired behavior for BigDecimals
* See {@link #JsonNodeFactory(boolean)} for a full description.
*
* @param bigDecimalExact If {code true} DISABLE normalization of {@link 
BigDecimal} values;
*if {code false} ENABLE normalization
* @return a factory instance with specified configuration
*
* @deprecated Use {@link 
com.fasterxml.jackson.databind.cfg.JsonNodeFeature#STRIP_TRAILING_BIGDECIMAL_ZEROES}
*   instead for configuring behavior.
*/
   @Deprecated
   public static JsonNodeFactory withExactBigDecimals(boolean 
bigDecimalExact)
   {
   return new JsonNodeFactory(bigDecimalExact);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15208: Upgrade Jackson dependencies to version 2.16.0 [kafka]

2023-12-15 Thread via GitHub


bmscomp commented on code in PR #13662:
URL: https://github.com/apache/kafka/pull/13662#discussion_r1428518449


##
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java:
##
@@ -143,7 +143,7 @@ public class JsonConverter implements Converter, 
HeaderConverter, Versioned {
 // names specified in the field
 private static final HashMap 
LOGICAL_CONVERTERS = new HashMap<>();
 
-private static final JsonNodeFactory JSON_NODE_FACTORY = 
JsonNodeFactory.withExactBigDecimals(true);
+private static final JsonNodeFactory JSON_NODE_FACTORY = new 
JsonNodeFactory(true);

Review Comment:
   @mimaison  Sorry I missed answering this point, the change is made because 
the static method is deprecated 
   
   ```
   @Deprecated
   public static JsonNodeFactory withExactBigDecimals(boolean 
bigDecimalExact) {
   return new JsonNodeFactory(bigDecimalExact);
   }

   ```
   and the best available replacement for it is the the constructor 
   
   ```
   public JsonNodeFactory(boolean bigDecimalExact) {
   this._cfgBigDecimalExact = bigDecimalExact;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Minor update patch versions [kafka]

2023-12-15 Thread via GitHub


jolshan merged PR #14641:
URL: https://github.com/apache/kafka/pull/14641


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15366: Modify LogDirFailureTest for KRaft [kafka]

2023-12-15 Thread via GitHub


rondagostino commented on PR #14977:
URL: https://github.com/apache/kafka/pull/14977#issuecomment-1858462616

   @viktorsomogyi I'm seeing these `kafka.server.LogDirFailureTest ` failures, 
reproducible locally as well as on the build:
   ```
   testIOExceptionDuringLogRoll(String).quorum=kraft
   testIOExceptionDuringCheckpoint(String).quorum=kraft
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15365) Broker-side replica management changes

2023-12-15 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15365:
--
Fix Version/s: 3.7.0

> Broker-side replica management changes
> --
>
> Key: KAFKA-15365
> URL: https://issues.apache.org/jira/browse/KAFKA-15365
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Omnia Ibrahim
>Priority: Major
> Fix For: 3.7.0
>
>
> On the broker side, process metadata changes to partition directories as the 
> broker catches up to metadata, as described in KIP-858 under "Replica 
> management".
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15358) QueuedReplicaToDirAssignments metric

2023-12-15 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15358:
--
Fix Version/s: 3.7.0

> QueuedReplicaToDirAssignments metric
> 
>
> Key: KAFKA-15358
> URL: https://issues.apache.org/jira/browse/KAFKA-15358
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Michael Westerby
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15471) Allow independently stop KRaft controllers or brokers

2023-12-15 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15471:
--
Fix Version/s: 3.7.0

> Allow independently stop KRaft controllers or brokers
> -
>
> Key: KAFKA-15471
> URL: https://issues.apache.org/jira/browse/KAFKA-15471
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hailey Ni
>Assignee: Hailey Ni
>Priority: Major
> Fix For: 3.7.0
>
>
> Some users run KRaft controllers and brokers on the same machine (not 
> containerized, but through tarballs, etc).  Prior to KRaft, when running 
> ZooKeeper and Kafka on the same machine, users could independently stop the 
> ZooKeeper node and Kafka broker since there were specific shell scripts for 
> each (zookeeper-server-stop and kafka-server-stop, respectively).
> However in KRaft mode, they can't stop the KRaft controllers independently 
> from the Kafka brokers because there is just a single script that doesn't 
> distinguish between the two processes and signals both of them. We need to 
> provide a way for users to kill either controllers or brokers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15471) Allow independently stop KRaft controllers or brokers

2023-12-15 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino resolved KAFKA-15471.
---
Resolution: Fixed

> Allow independently stop KRaft controllers or brokers
> -
>
> Key: KAFKA-15471
> URL: https://issues.apache.org/jira/browse/KAFKA-15471
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hailey Ni
>Assignee: Hailey Ni
>Priority: Major
>
> Some users run KRaft controllers and brokers on the same machine (not 
> containerized, but through tarballs, etc).  Prior to KRaft, when running 
> ZooKeeper and Kafka on the same machine, users could independently stop the 
> ZooKeeper node and Kafka broker since there were specific shell scripts for 
> each (zookeeper-server-stop and kafka-server-stop, respectively).
> However in KRaft mode, they can't stop the KRaft controllers independently 
> from the Kafka brokers because there is just a single script that doesn't 
> distinguish between the two processes and signals both of them. We need to 
> provide a way for users to kill either controllers or brokers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16020) Time#waitForFuture should tolerate nanosecond overflow

2023-12-15 Thread David Arthur (Jira)
David Arthur created KAFKA-16020:


 Summary: Time#waitForFuture should tolerate nanosecond overflow
 Key: KAFKA-16020
 URL: https://issues.apache.org/jira/browse/KAFKA-16020
 Project: Kafka
  Issue Type: Bug
Reporter: David Arthur


Reported by [~jsancio] here 
https://github.com/apache/kafka/pull/15007#discussion_r1428359211

Time#waitForFuture should follow the JDK recommendation for comparing elapsed 
nanoseconds to a duration.

https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/System.html#nanoTime()

{quote}
For example, to measure how long some code takes to execute:

 
 long startTime = System.nanoTime();
 // ... the code being measured ...
 long elapsedNanos = System.nanoTime() - startTime;
To compare elapsed time against a timeout, use

 
 if (System.nanoTime() - startTime >= timeoutNanos) ...
instead of
 
 if (System.nanoTime() >= startTime + timeoutNanos) ...
because of the possibility of numerical overflow.
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16007 Merge batch records during ZK migration [kafka]

2023-12-15 Thread via GitHub


mumrah commented on code in PR #15007:
URL: https://github.com/apache/kafka/pull/15007#discussion_r1428410182


##
metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java:
##
@@ -41,15 +41,17 @@ public static class Builder {
 private final Map counts = new 
HashMap<>();
 private int batches = 0;
 private int total = 0;
+private long batchDurationsNs = 0;
 private long endTimeNanos = 0;
 
 Builder(Time time) {
 this.time = time;
 this.startTimeNanos = time.nanoseconds();
 }
 
-public void acceptBatch(List recordBatch) {
+public void acceptBatch(List recordBatch, long 
durationNs) {
 batches++;
+batchDurationsNs += durationNs;

Review Comment:
   Yup, that's correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2023-12-15 Thread Sabit (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797281#comment-17797281
 ] 

Sabit commented on KAFKA-12679:
---

Would it be possible to add some logging as to which thread owns the lock when 
this happens? We are currently experiencing this issue repeatedly on one 
client, and it would be helpful to understand which other streams thread is the 
lock owner in case that thread is behaving unexpectedly. We are on streams 
3.4.x and not using the stateUpdater.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invokes and verify the consumer callback

2023-12-15 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16019:
---
Description: 
I was running the PlaintextConsumer to test the async consumer; however, a few 
tests were failing with not being able to verify the listener is invoked 
correctly

For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`

Around 50% of the time, the listener's callsToAssigned was never incremented 
correctly.  Event changing it to awaitUntilTrue it was still the same case
{code:java}
consumer.subscribe(List(topic, topic2).asJava, listener)
val records = awaitNonEmptyRecords(consumer, tp)
assertEquals(1, listener.callsToAssigned, "should be assigned once") {code}

  was:
I was running the PlaintextConsumer to test the async consumer; however, a few 
tests were failing with not being able to verify the listener is invoked 
correctly

For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`

Around 50% of the time, the listener's callsToAssigned was never incremented 
correctly.  Event changing it to awaitUntilTrue it was still the same case
consumer.subscribe(List(topic, topic2).asJava, listener)
val records = awaitNonEmptyRecords(consumer, tp)
assertEquals(1, listener.callsToAssigned, "should be assigned once")


> Some of the tests in PlaintextConsumer can't seem to deterministically 
> invokes and verify the consumer callback
> ---
>
> Key: KAFKA-16019
> URL: https://issues.apache.org/jira/browse/KAFKA-16019
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Priority: Major
>
> I was running the PlaintextConsumer to test the async consumer; however, a 
> few tests were failing with not being able to verify the listener is invoked 
> correctly
> For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`
> Around 50% of the time, the listener's callsToAssigned was never incremented 
> correctly.  Event changing it to awaitUntilTrue it was still the same case
> {code:java}
> consumer.subscribe(List(topic, topic2).asJava, listener)
> val records = awaitNonEmptyRecords(consumer, tp)
> assertEquals(1, listener.callsToAssigned, "should be assigned once") {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invokes and verify the consumer callback

2023-12-15 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16019:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Some of the tests in PlaintextConsumer can't seem to deterministically 
> invokes and verify the consumer callback
> ---
>
> Key: KAFKA-16019
> URL: https://issues.apache.org/jira/browse/KAFKA-16019
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Priority: Major
>
> I was running the PlaintextConsumer to test the async consumer; however, a 
> few tests were failing with not being able to verify the listener is invoked 
> correctly
> For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`
> Around 50% of the time, the listener's callsToAssigned was never incremented 
> correctly.  Event changing it to awaitUntilTrue it was still the same case
> consumer.subscribe(List(topic, topic2).asJava, listener)
> val records = awaitNonEmptyRecords(consumer, tp)
> assertEquals(1, listener.callsToAssigned, "should be assigned once")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invokes and verify the consumer callback

2023-12-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16019:
--

 Summary: Some of the tests in PlaintextConsumer can't seem to 
deterministically invokes and verify the consumer callback
 Key: KAFKA-16019
 URL: https://issues.apache.org/jira/browse/KAFKA-16019
 Project: Kafka
  Issue Type: Task
Reporter: Philip Nee


I was running the PlaintextConsumer to test the async consumer; however, a few 
tests were failing with not being able to verify the listener is invoked 
correctly

For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`

Around 50% of the time, the listener's callsToAssigned was never incremented 
correctly.  Event changing it to awaitUntilTrue it was still the same case
consumer.subscribe(List(topic, topic2).asJava, listener)
val records = awaitNonEmptyRecords(consumer, tp)
assertEquals(1, listener.callsToAssigned, "should be assigned once")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16007 Merge batch records during ZK migration [kafka]

2023-12-15 Thread via GitHub


jsancio commented on code in PR #15007:
URL: https://github.com/apache/kafka/pull/15007#discussion_r1428364538


##
metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java:
##
@@ -41,15 +41,17 @@ public static class Builder {
 private final Map counts = new 
HashMap<>();
 private int batches = 0;
 private int total = 0;
+private long batchDurationsNs = 0;
 private long endTimeNanos = 0;
 
 Builder(Time time) {
 this.time = time;
 this.startTimeNanos = time.nanoseconds();
 }
 
-public void acceptBatch(List recordBatch) {
+public void acceptBatch(List recordBatch, long 
durationNs) {
 batches++;
+batchDurationsNs += durationNs;

Review Comment:
   Okay. This is measuring how much time the ZK migration spent in the 
controller, writing and committing the batches to KRaft, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16007 Merge batch records during ZK migration [kafka]

2023-12-15 Thread via GitHub


jsancio commented on code in PR #15007:
URL: https://github.com/apache/kafka/pull/15007#discussion_r1428347367


##
metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * A record batch consumer that merges incoming batches into batches of a 
minimum a given size. It does so
+ * by buffering the records into an array that is later flushed to a 
downstream consumer. Batches consumed
+ * by this class will not be broken apart, only combined with other batches to 
reach the minimum batch size.
+ * 
+ * Note that {@link #flush()} must be called after the last batch has been 
accepted in order to flush any
+ * buffered records.
+ */
+public class BufferingBatchConsumer implements Consumer> {
+
+private final Consumer> delegateConsumer;
+private final List bufferedBatch;
+private final int minBatchSize;
+
+BufferingBatchConsumer(Consumer> delegateConsumer, int 
minBatchSize) {
+this.delegateConsumer = delegateConsumer;
+this.bufferedBatch = new ArrayList<>(minBatchSize);
+this.minBatchSize = minBatchSize;
+}
+
+@Override
+public void accept(List batch) {
+bufferedBatch.addAll(batch);
+if (bufferedBatch.size() >= minBatchSize) {
+delegateConsumer.accept(new ArrayList<>(bufferedBatch));
+bufferedBatch.clear();

Review Comment:
   How bout this implication?
   ```java
   delegateConsumer.accept(bufferedBatch);
   bufferedBatch = new ArrayList<>(minBatchSize);
   ```
   
   Similar in `flush`. There seems to be some code duplication between these 
two methods.



##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -645,6 +648,29 @@ public void run() throws Exception {
 }
 }
 
+private BufferingBatchConsumer 
buildMigrationBatchConsumer(
+MigrationManifest.Builder manifestBuilder
+) {
+return new BufferingBatchConsumer<>(batch -> {
+try {
+if (log.isTraceEnabled()) {
+batch.forEach(apiMessageAndVersion ->
+
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message(;
+}
+CompletableFuture future = 
zkRecordConsumer.acceptBatch(batch);
+long batchStart = time.nanoseconds();
+FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",

Review Comment:
   That's fair. I keep forgetting that we still need to support Java 8. Looking 
forward to 4.x.



##
metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java:
##
@@ -41,15 +41,17 @@ public static class Builder {
 private final Map counts = new 
HashMap<>();
 private int batches = 0;
 private int total = 0;
+private long batchDurationsNs = 0;
 private long endTimeNanos = 0;
 
 Builder(Time time) {
 this.time = time;
 this.startTimeNanos = time.nanoseconds();
 }
 
-public void acceptBatch(List recordBatch) {
+public void acceptBatch(List recordBatch, long 
durationNs) {
 batches++;
+batchDurationsNs += durationNs;

Review Comment:
   Okay. This is measure how much time the ZK migration spent in the 
controller, writing and committing the batches to KRaft, right?



##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -645,6 +648,29 @@ public void run() throws Exception {
 }
 }
 
+private BufferingBatchConsumer 
buildMigrationBatchConsumer(
+MigrationManifest.Builder manifestBuilder
+) {
+return new BufferingBatchConsumer<>(batch -> {
+try {
+if (log.isTraceEnabled()) {
+batch.forEach(apiMessageAndVersion ->
+

[PR] KAFKA-16003: Always create the /config/topics ZNode even for topics w… [kafka]

2023-12-15 Thread via GitHub


mimaison opened a new pull request, #15022:
URL: https://github.com/apache/kafka/pull/15022

   …ithout configs
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2023-12-15 Thread via GitHub


Phuc-Hong-Tran commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-1858303193

   @philipnee, all builds succeed, test failures looks unrelated. PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16018) KafkaStreams can go into a zombie state if UncaughtExceptionHandler is specified via the deprecated method

2023-12-15 Thread Tommy Becker (Jira)
Tommy Becker created KAFKA-16018:


 Summary: KafkaStreams can go into a zombie state if 
UncaughtExceptionHandler is specified via the deprecated method
 Key: KAFKA-16018
 URL: https://issues.apache.org/jira/browse/KAFKA-16018
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.2
Reporter: Tommy Becker


We have a streams application in which all StreamThreads died due to a lack of 
disk space. To our surprise, the KafkaStreams instance still reported its state 
as running. Upon further investigation, it appears this is due to the 
application setting an UncaughtExceptionHandler via the deprecated method (this 
application was recently upgraded from 2.4.1): 
[https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)]

The only way a StreamThread failure can cause the KafkaStreams instance to 
transition to an error state now is via the new StreamsUncaughtExceptionHandler 
machinery, but when an old UncaughtExceptionHandler is set by the old method 
this code is effectively bypassed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-12-15 Thread via GitHub


jolshan merged PR #14702:
URL: https://github.com/apache/kafka/pull/14702


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-12-15 Thread via GitHub


jolshan commented on PR #14702:
URL: https://github.com/apache/kafka/pull/14702#issuecomment-1858246311

   Given that this failed on a storage test (storage tests shouldn't be 
affected here), and that it was able to build on some of the many other runs I 
tried, I am going to merge this after 12 rebuilds.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]

2023-12-15 Thread via GitHub


jolshan commented on PR #14702:
URL: https://github.com/apache/kafka/pull/14702#issuecomment-1858240819

   I'm convinced this PR is cursed. Still failing builds. :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Minor update patch versions [kafka]

2023-12-15 Thread via GitHub


jolshan commented on PR #14641:
URL: https://github.com/apache/kafka/pull/14641#issuecomment-1858231854

   Reran and the majority of the 2.6.3 and 2.7.2 tests passed. The ones that 
didn't had a different issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-15 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1428202192


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -892,6 +895,43 @@ public void replay(
 }
 }
 
+/**
+ * Applies the given transaction marker.
+ *
+ * @param producerIdThe producer id.
+ * @param resultThe result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+public void completeTransaction(
+long producerId,
+TransactionResult result
+) throws RuntimeException {
+Offsets pendingOffsets = 
pendingTransactionalOffsets.remove(producerId);
+
+if (result == TransactionResult.COMMIT) {
+log.debug("Committed transactional offset commits for producer id 
{}.", producerId);
+if (pendingOffsets == null) return;
+
+pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+topicOffsets.forEach((topicName, partitionOffsets) -> {
+partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
+log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
+"with topic {}, partition {}, and offset {}.",
+producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
+offsets.put(
+groupId,
+topicName,
+partitionId,
+offsetAndMetadata
+);

Review Comment:
   I think the word uncommitted is overloaded here. There is the txn commit 
(marker) and the write commit (returning from the leader once all (in-sync) 
replicas have replicated). 
   
   I assume we only move the offsets over when we see the txn commit, but they 
will still be unavailable until the write commit completes (as any offsets are 
unavailable until the write completes). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-15 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1428196798


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -299,14 +304,14 @@ class TransactionsTest extends IntegrationTestHarness {
 
   @nowarn("cat=deprecation")
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
+  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
   def testSendOffsetsWithGroupId(quorum: String): Unit = {
 sendOffset((producer, groupId, consumer) =>
   
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, 
groupId))
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
+  @ValueSource(strings = Array(/*"zk", "kraft",*/ "kraft+kip848"))

Review Comment:
   Does this still need to be fixed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-15 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1428195220


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -892,6 +895,43 @@ public void replay(
 }
 }
 
+/**
+ * Applies the given transaction marker.
+ *
+ * @param producerIdThe producer id.
+ * @param resultThe result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+public void completeTransaction(
+long producerId,
+TransactionResult result
+) throws RuntimeException {
+Offsets pendingOffsets = 
pendingTransactionalOffsets.remove(producerId);
+
+if (result == TransactionResult.COMMIT) {
+log.debug("Committed transactional offset commits for producer id 
{}.", producerId);
+if (pendingOffsets == null) return;
+
+pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {

Review Comment:
   oh right -- that makes sense. Thanks for clarifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-15 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1428194671


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -135,7 +136,22 @@ class CoordinatorLoaderImpl[T](
 
 memoryRecords.batches.forEach { batch =>
   if (batch.isControlBatch) {
-throw new IllegalStateException("Control batches are not 
supported yet.")
+batch.asScala.foreach { record =>

Review Comment:
   `replayEndTransactionMarker` seems like a good compromise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16003) The znode /config/topics is not updated during KRaft migration in "dual-write" mode

2023-12-15 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797249#comment-17797249
 ] 

Mickael Maison commented on KAFKA-16003:


[~davidarthur] Yes this is what I had in mind. I should have a PR ready later 
on today.

> The znode /config/topics is not updated during KRaft migration in 
> "dual-write" mode
> ---
>
> Key: KAFKA-16003
> URL: https://issues.apache.org/jira/browse/KAFKA-16003
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.6.1
>Reporter: Paolo Patierno
>Assignee: Mickael Maison
>Priority: Major
>
> I tried the following scenario ...
> I have a ZooKeeper-based cluster and create a my-topic-1 topic (without 
> specifying any specific configuration for it). The correct znodes are created 
> under /config/topics and /brokers/topics.
> I start a migration to KRaft but not moving forward from "dual write" mode. 
> While in this mode, I create a new my-topic-2 topic (still without any 
> specific config). I see that a new znode is created under /brokers/topics but 
> NOT under /config/topics. It seems that the KRaft controller is not updating 
> this information in ZooKeeper during the dual-write. The controller log shows 
> that the write to ZooKeeper was done, but not everything I would say:
> {code:java}
> 2023-12-13 10:23:26,229 TRACE [KRaftMigrationDriver id=3] Create Topic 
> my-topic-2, ID Macbp8BvQUKpzmq2vG_8dA. Transitioned migration state from 
> ZkMigrationLeadershipState{kraftControllerId=3, kraftControllerEpoch=7, 
> kraftMetadataOffset=445, kraftMetadataEpoch=7, 
> lastUpdatedTimeMs=1702462785587, migrationZkVersion=236, controllerZkEpoch=3, 
> controllerZkVersion=3} to ZkMigrationLeadershipState{kraftControllerId=3, 
> kraftControllerEpoch=7, kraftMetadataOffset=445, kraftMetadataEpoch=7, 
> lastUpdatedTimeMs=1702462785587, migrationZkVersion=237, controllerZkEpoch=3, 
> controllerZkVersion=3} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-3-migration-driver-event-handler]
> 2023-12-13 10:23:26,229 DEBUG [KRaftMigrationDriver id=3] Made the following 
> ZK writes when handling KRaft delta: {CreateTopic=1} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-3-migration-driver-event-handler] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]

2023-12-15 Thread via GitHub


nicktelford commented on PR #14853:
URL: https://github.com/apache/kafka/pull/14853#issuecomment-1858152008

   The build still appears to be failing, this time with an error from git 
about updating the HEAD... It doesnt look like other PRs are affected by this, 
so I'm guessing the build is cached per-PR and something has borked the cached 
build for this PR.
   
   Shall I just close this PR and open a new one from the same branch? Might 
give it a fresh build that can then succeed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-15 Thread via GitHub


C0urante commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1428127814


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");
 }
 
 log.debug("Waiting for connector to block");
-if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   I don't believe these changes are necessary here since the portions they 
address are not affected by the PR. If you would like to do this cleanup in a 
separate PR, I'd be happy to review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Description: 
Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after increasing these values  I got the 
same result, timeouts were happening, like the new values were not having any 
effect. 

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validates the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)
      } {code}
 

I tested it and now I am able to modify the timeouts and make my application to 
catch the result of the command properly.

 

  was:
Using the 

[jira] [Commented] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797219#comment-17797219
 ] 

Sergio Troiano commented on KAFKA-16015:


Thanks as usual [~pprovenzano]  :)

 

I will open then only the PR for trunk , I will update the ticket when the PR 
is ready

> kafka-leader-election timeout values always overwritten by default values 
> --
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 3.5.1, 3.6.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: 
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
> request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: 
> The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlModelTrainingSamples-18: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlPartitionMetricSamples-8: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always 
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 
> seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> --                                  ---
> --admin.config     Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  
> and *default.api.timeout.ms.* So even after increasing these values  I got 
> hte same result, timeouts were happening, like the new values were not having 
> any effect. 
> So I checked the source code and I came across with a bug, no matter the 
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[3.6 
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic 
> partitions. The type of elections supported are preferred replicas and 
> unclean replicas."
>     )    validate(commandOptions)    val electionType = 
> commandOptions.options.valueOf(commandOptions.electionType)    val 
> jsonFileTopicPartitions = 
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
> path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new 
> TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this 
> to be None if it is use.
>      * The validate function should be checking that this option is required 
> if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = 
> jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = 
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map 
> { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
> timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> (timeout.toMillis / 2).toString)      Admin.create(props)
>     } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is 
> 30/2 which validate the 15 seconds timeout.
> Also we can 

[jira] [Commented] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797218#comment-17797218
 ] 

Proven Provenzano commented on KAFKA-16015:
---

Hi [~sergio_troi...@hotmail.com] 

I would suggest opening a PR for trunk first so that it can then be 
cherry-picked to 3.7 branch before code freeze.

The 3.6 and 3.5 point releases have already shipped and this isn't a security 
issue that needs to be immediately addressed so we have more time to get the 
fix into those branches.

> kafka-leader-election timeout values always overwritten by default values 
> --
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 3.5.1, 3.6.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: 
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
> request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: 
> The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlModelTrainingSamples-18: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlPartitionMetricSamples-8: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always 
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 
> seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> --                                  ---
> --admin.config     Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  
> and *default.api.timeout.ms.* So even after increasing these values  I got 
> hte same result, timeouts were happening, like the new values were not having 
> any effect. 
> So I checked the source code and I came across with a bug, no matter the 
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[3.6 
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic 
> partitions. The type of elections supported are preferred replicas and 
> unclean replicas."
>     )    validate(commandOptions)    val electionType = 
> commandOptions.options.valueOf(commandOptions.electionType)    val 
> jsonFileTopicPartitions = 
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
> path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new 
> TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this 
> to be None if it is use.
>      * The validate function should be checking that this option is required 
> if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = 
> jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = 
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map 
> { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
> timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 

[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Description: 
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset that the record collector stored when it sent the records to the 
changelog topic. However, since in step 2 the state directory is wiped out, the 
state does not contain those records anymore. It only contains the records that 
it restored in step 3 which might be less. The root cause of this is that the 
offsets in the record collector are not cleaned up when the record collector is 
closed. 

I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.

The repro can be started with

{code}
./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x 
spotbugsTest --tests RestoreIntegrationTest.test --info > test.log
{code}

The repro writes records into a state store and tries to retrieve them again 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582).
 It will throw an {{IllegalStateException}} if it cannot find a record in the 
state 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594).
 Once the offsets in the record collector are cleared on close 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332
 and 
https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349),
 the {{IllegalStateException}} does not occur anymore.

In the logs you can check for 
- {{Restore batch end offset is}} which are the restored offsets in the state.
- {{task [0_1] Writing checkpoint:}} which are the written checkpoints.
- {{task [0_1] Checkpointable offsets}} which show the offsets coming from the 
sending records to the changelog topic 
{{RestoreIntegrationTesttest-stateStore-changelog-1}}
Always the last instances of these before the {{IllegalStateException}} is 
thrown.

You will see that the restored offsets are less than the offsets that are 
written to the checkpoint. The offsets written to the checkpoint come from the 
offsets stored when sending the records to the changelog topic.  



  was:
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the 

[jira] [Commented] (KAFKA-16003) The znode /config/topics is not updated during KRaft migration in "dual-write" mode

2023-12-15 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797215#comment-17797215
 ] 

David Arthur commented on KAFKA-16003:
--

Ah great. Thanks again for the report [~ppatierno].

[~mimaison] thanks for picking this one up. I did look into it briefly 
yesterday and I think the simplest thing is to create the znode in in 
ZkTopicMigrationClient#createTopic when we create the main topic ZNode. The 
topic configs (if any) will later get written by ZkConfigMigrationClient. 

> The znode /config/topics is not updated during KRaft migration in 
> "dual-write" mode
> ---
>
> Key: KAFKA-16003
> URL: https://issues.apache.org/jira/browse/KAFKA-16003
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.6.1
>Reporter: Paolo Patierno
>Assignee: Mickael Maison
>Priority: Major
>
> I tried the following scenario ...
> I have a ZooKeeper-based cluster and create a my-topic-1 topic (without 
> specifying any specific configuration for it). The correct znodes are created 
> under /config/topics and /brokers/topics.
> I start a migration to KRaft but not moving forward from "dual write" mode. 
> While in this mode, I create a new my-topic-2 topic (still without any 
> specific config). I see that a new znode is created under /brokers/topics but 
> NOT under /config/topics. It seems that the KRaft controller is not updating 
> this information in ZooKeeper during the dual-write. The controller log shows 
> that the write to ZooKeeper was done, but not everything I would say:
> {code:java}
> 2023-12-13 10:23:26,229 TRACE [KRaftMigrationDriver id=3] Create Topic 
> my-topic-2, ID Macbp8BvQUKpzmq2vG_8dA. Transitioned migration state from 
> ZkMigrationLeadershipState{kraftControllerId=3, kraftControllerEpoch=7, 
> kraftMetadataOffset=445, kraftMetadataEpoch=7, 
> lastUpdatedTimeMs=1702462785587, migrationZkVersion=236, controllerZkEpoch=3, 
> controllerZkVersion=3} to ZkMigrationLeadershipState{kraftControllerId=3, 
> kraftControllerEpoch=7, kraftMetadataOffset=445, kraftMetadataEpoch=7, 
> lastUpdatedTimeMs=1702462785587, migrationZkVersion=237, controllerZkEpoch=3, 
> controllerZkVersion=3} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-3-migration-driver-event-handler]
> 2023-12-13 10:23:26,229 DEBUG [KRaftMigrationDriver id=3] Made the following 
> ZK writes when handling KRaft delta: {CreateTopic=1} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-3-migration-driver-event-handler] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16003) The znode /config/topics is not updated during KRaft migration in "dual-write" mode

2023-12-15 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797215#comment-17797215
 ] 

David Arthur edited comment on KAFKA-16003 at 12/15/23 2:54 PM:


Ah, great! Thanks again for the report [~ppatierno].

[~mimaison] thanks for picking this one up. I did look into it briefly 
yesterday and I think the simplest thing is to create the znode in in 
ZkTopicMigrationClient#createTopic when we create the main topic ZNode. The 
topic configs (if any) will later get written by ZkConfigMigrationClient. 


was (Author: davidarthur):
Ah great. Thanks again for the report [~ppatierno].

[~mimaison] thanks for picking this one up. I did look into it briefly 
yesterday and I think the simplest thing is to create the znode in in 
ZkTopicMigrationClient#createTopic when we create the main topic ZNode. The 
topic configs (if any) will later get written by ZkConfigMigrationClient. 

> The znode /config/topics is not updated during KRaft migration in 
> "dual-write" mode
> ---
>
> Key: KAFKA-16003
> URL: https://issues.apache.org/jira/browse/KAFKA-16003
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.6.1
>Reporter: Paolo Patierno
>Assignee: Mickael Maison
>Priority: Major
>
> I tried the following scenario ...
> I have a ZooKeeper-based cluster and create a my-topic-1 topic (without 
> specifying any specific configuration for it). The correct znodes are created 
> under /config/topics and /brokers/topics.
> I start a migration to KRaft but not moving forward from "dual write" mode. 
> While in this mode, I create a new my-topic-2 topic (still without any 
> specific config). I see that a new znode is created under /brokers/topics but 
> NOT under /config/topics. It seems that the KRaft controller is not updating 
> this information in ZooKeeper during the dual-write. The controller log shows 
> that the write to ZooKeeper was done, but not everything I would say:
> {code:java}
> 2023-12-13 10:23:26,229 TRACE [KRaftMigrationDriver id=3] Create Topic 
> my-topic-2, ID Macbp8BvQUKpzmq2vG_8dA. Transitioned migration state from 
> ZkMigrationLeadershipState{kraftControllerId=3, kraftControllerEpoch=7, 
> kraftMetadataOffset=445, kraftMetadataEpoch=7, 
> lastUpdatedTimeMs=1702462785587, migrationZkVersion=236, controllerZkEpoch=3, 
> controllerZkVersion=3} to ZkMigrationLeadershipState{kraftControllerId=3, 
> kraftControllerEpoch=7, kraftMetadataOffset=445, kraftMetadataEpoch=7, 
> lastUpdatedTimeMs=1702462785587, migrationZkVersion=237, controllerZkEpoch=3, 
> controllerZkVersion=3} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-3-migration-driver-event-handler]
> 2023-12-13 10:23:26,229 DEBUG [KRaftMigrationDriver id=3] Made the following 
> ZK writes when handling KRaft delta: {CreateTopic=1} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-3-migration-driver-event-handler] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Description: 
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset that the record collector stored when it sent the records to the 
changelog topic. However, since in step 2 the state directory is wiped out, the 
state does not contain those records anymore. It only contains the records that 
it restored in step 3 which might be less. The root cause of this is that the 
offsets in the record collector are not cleaned up when the record collector is 
closed. 

I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.

  was:
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset of the 

https://github.com/cadonna/kafka/tree/KAFKA-16017


> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the 

[jira] [Assigned] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-16017:
-

Assignee: Bruno Cadonna

> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset that the record collector stored when it sent the records to the 
> changelog topic. However, since in step 2 the state directory is wiped out, 
> the state does not contain those records anymore. It only contains the 
> records that it restored in step 3 which might be less. The root cause of 
> this is that the offsets in the record collector are not cleaned up when the 
> record collector is closed. 
> I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Description: 
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset of the 

https://github.com/cadonna/kafka/tree/KAFKA-16017

  was:Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.


> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset of the 
> https://github.com/cadonna/kafka/tree/KAFKA-16017



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: transform new coordinator error before returning to client [kafka]

2023-12-15 Thread via GitHub


dajac merged PR #15001:
URL: https://github.com/apache/kafka/pull/15001


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: transform new coordinator error before returning to client [kafka]

2023-12-15 Thread via GitHub


dajac commented on PR #15001:
URL: https://github.com/apache/kafka/pull/15001#issuecomment-1857981350

   Merged to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


tinaselenge commented on PR #14375:
URL: https://github.com/apache/kafka/pull/14375#issuecomment-1857973003

   @showuon thank you so much for reviewing the PR. I believe I address the 
comments that I can. Please review again, when you get a chance. Thank you. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


tinaselenge commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1428034292


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3205,13 +3220,52 @@ class ReplicaManagerTest {
 threadNamePrefix: Option[String],
 quotaManager: ReplicationQuotaManager
   ): ReplicaFetcherManager = {
-mockReplicaFetcherManager.getOrElse {
-  super.createReplicaFetcherManager(
-metrics,
-time,
-threadNamePrefix,
-quotaManager
-  )
+  mockReplicaFetcherManager.getOrElse {

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


tinaselenge commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1428033100


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3205,13 +3220,52 @@ class ReplicaManagerTest {
 threadNamePrefix: Option[String],
 quotaManager: ReplicationQuotaManager
   ): ReplicaFetcherManager = {
-mockReplicaFetcherManager.getOrElse {
-  super.createReplicaFetcherManager(
-metrics,
-time,
-threadNamePrefix,
-quotaManager
-  )
+  mockReplicaFetcherManager.getOrElse {
+if (buildRemoteLogAuxState) {
+  super.createReplicaFetcherManager(
+metrics,
+time,
+threadNamePrefix,
+quotaManager
+  )
+  val config = this.config
+  val metadataCache = this.metadataCache
+  new ReplicaFetcherManager(config, this, metrics, time, 
threadNamePrefix, quotaManager, () => metadataCache.metadataVersion(), () => 1) 
{
+override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): ReplicaFetcherThread = {
+  val prefix = threadNamePrefix.map(tp => 
s"$tp:").getOrElse("")
+  val threadName = 
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+
+  val tp = new TopicPartition(topic, 0)
+  val leader = new MockLeaderEndPoint() {
+override def fetch(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+  Map(tp -> new 
FetchData().setErrorCode(Errors.OFFSET_MOVED_TO_TIERED_STORAGE.code))
+}
+  }
+  leader.setLeaderState(tp, PartitionState(leaderEpoch = 0))
+  leader.setReplicaPartitionStateCallback(tp => 
PartitionState(leaderEpoch = 0))
+
+  val fetcher = new ReplicaFetcherThread(threadName, leader, 
config, failedPartitions, replicaManager,
+quotaManager, "", () => config.interBrokerProtocolVersion)
+
+  val initialFetchState = InitialFetchState(
+topicId = Some(Uuid.randomUuid()),
+leader = leader.brokerEndPoint(),
+currentLeaderEpoch = 0,
+initOffset = 0)
+
+  fetcher.addPartitions(Map(tp -> initialFetchState))
+
+  fetcher
+}
+  }
+} else {
+  super.createReplicaFetcherManager(
+metrics,
+time,
+threadNamePrefix,
+quotaManager
+  )

Review Comment:
   This is not a duplication but intended change. It cannot be done that way 
due to the mismatching return type. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


tinaselenge commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1428033100


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3205,13 +3220,52 @@ class ReplicaManagerTest {
 threadNamePrefix: Option[String],
 quotaManager: ReplicationQuotaManager
   ): ReplicaFetcherManager = {
-mockReplicaFetcherManager.getOrElse {
-  super.createReplicaFetcherManager(
-metrics,
-time,
-threadNamePrefix,
-quotaManager
-  )
+  mockReplicaFetcherManager.getOrElse {
+if (buildRemoteLogAuxState) {
+  super.createReplicaFetcherManager(
+metrics,
+time,
+threadNamePrefix,
+quotaManager
+  )
+  val config = this.config
+  val metadataCache = this.metadataCache
+  new ReplicaFetcherManager(config, this, metrics, time, 
threadNamePrefix, quotaManager, () => metadataCache.metadataVersion(), () => 1) 
{
+override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): ReplicaFetcherThread = {
+  val prefix = threadNamePrefix.map(tp => 
s"$tp:").getOrElse("")
+  val threadName = 
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+
+  val tp = new TopicPartition(topic, 0)
+  val leader = new MockLeaderEndPoint() {
+override def fetch(fetchRequest: FetchRequest.Builder): 
Map[TopicPartition, FetchData] = {
+  Map(tp -> new 
FetchData().setErrorCode(Errors.OFFSET_MOVED_TO_TIERED_STORAGE.code))
+}
+  }
+  leader.setLeaderState(tp, PartitionState(leaderEpoch = 0))
+  leader.setReplicaPartitionStateCallback(tp => 
PartitionState(leaderEpoch = 0))
+
+  val fetcher = new ReplicaFetcherThread(threadName, leader, 
config, failedPartitions, replicaManager,
+quotaManager, "", () => config.interBrokerProtocolVersion)
+
+  val initialFetchState = InitialFetchState(
+topicId = Some(Uuid.randomUuid()),
+leader = leader.brokerEndPoint(),
+currentLeaderEpoch = 0,
+initOffset = 0)
+
+  fetcher.addPartitions(Map(tp -> initialFetchState))
+
+  fetcher
+}
+  }
+} else {
+  super.createReplicaFetcherManager(
+metrics,
+time,
+threadNamePrefix,
+quotaManager
+  )

Review Comment:
   This cannot be done, due to the mismatching return type. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


tinaselenge commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1428032730


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -229,6 +229,9 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
 Partition partition = 
replicaMgr.getPartitionOrException(topicPartition);
 partition.truncateFullyAndStartAt(nextOffset, false, 
Option.apply(leaderLogStartOffset));
 
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+

Review Comment:
   The original reason was not to emit the error metrics, when 
buildRemoteLogAuxState fails because of the remote log storage not being 
enabled. However, thinking about this again, this makes more sense. We will 
always emit request metrics when we attempt to build log aux, and emit error 
metrics when there is an exception. 
   
   I made the suggested change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


tinaselenge commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1428029820


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1042,8 +1053,9 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
 // If the follower HAS NOT picked up the changes, and they become 
the leader then they will go through this process
 // again and delete them with the original deletion reason i.e. 
size, time or log start offset breach.
 List undeletedSegments = new ArrayList<>();
+String topicName = log.topicPartition().topic();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15158: Add metrics for RemoteRequestsPerSec [kafka]

2023-12-15 Thread via GitHub


tinaselenge commented on code in PR #14375:
URL: https://github.com/apache/kafka/pull/14375#discussion_r1428029215


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3884,6 +3938,116 @@ class ReplicaManagerTest {
 }
   }
 
+  @Test
+  def testSuccessfulBuildRemoteLogAuxStateMetrics(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+
+val remoteLogManager = mock(classOf[RemoteLogManager])
+val remoteLogSegmentMetadata = mock(classOf[RemoteLogSegmentMetadata])
+when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), 
anyLong())).thenReturn(
+  Optional.of(remoteLogSegmentMetadata)
+)
+val storageManager = mock(classOf[RemoteStorageManager])
+when(storageManager.fetchIndex(any(), any())).thenReturn(new 
ByteArrayInputStream("0".getBytes()))
+when(remoteLogManager.storageManager()).thenReturn(storageManager)
+
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteLogManager = Some(remoteLogManager), 
buildRemoteLogAuxState = true)
+try {
+
+  val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+  replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+  val partition0Replicas = Seq[Integer](0, 1).asJava
+  val topicIds = Map(tp0.topic -> topicId).asJava
+  val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(
+  new LeaderAndIsrPartitionState()
+.setTopicName(tp0.topic)
+.setPartitionIndex(tp0.partition)
+.setControllerEpoch(0)
+.setLeader(1)
+.setLeaderEpoch(0)
+.setIsr(partition0Replicas)
+.setPartitionEpoch(0)
+.setReplicas(partition0Replicas)
+.setIsNew(true)
+).asJava,
+topicIds,
+Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+  // Verify the metrics for build remote log state and for failures is 
zero before replicas start to fetch
+  assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
+  assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+  // Verify aggregate metrics
+  assertEquals(0, 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
+  assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
+
+  replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+  // Replicas fetch from the leader periodically, therefore we check that 
the metric value is increasing
+  
assertTrue(brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count
 > 0)
+  assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+  // Verify aggregate metrics
+  
assertTrue(brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count
 > 0)
+  assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testFailedBuildRemoteLogAuxStateMetrics(): Unit = {

Review Comment:
   `
  when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), 
anyLong())).thenReturn(
 Optional.empty())
   `
   This is basically what testFailedBuildRemoteLogAuxStateMetrics() already 
does. Because we are not mocking remoteLogSegmentMetadata,  
fetchRemoteLogSegmentMetadata returns an empty result. 
   
   However, I added another test for the exception part. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Description: Streams checkpoints the wrong offset when a task is revived 
after a {{TaskCorruptedException}} and the task is then migrated to another 
stream thread during restoration.

> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16017:
-

 Summary: Checkpointed offset is incorrect when task is revived and 
restoring 
 Key: KAFKA-16017
 URL: https://issues.apache.org/jira/browse/KAFKA-16017
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Bruno Cadonna






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16007 Merge batch records during ZK migration [kafka]

2023-12-15 Thread via GitHub


mumrah commented on PR #15007:
URL: https://github.com/apache/kafka/pull/15007#issuecomment-1857890630

   Thanks for taking a look @jsancio! I'll answer some related questions here. 
If an error occurs inside the `readAllMetadata` call in KRaftMetadataDriver, 
the catch block will explicitly abort the transaction. If the controller 
crashes during this call or any time before committing the 
EndTransactionRecord, then the next active controller will abort the partial 
transaction. This is the same case as a controller not crashing but losing 
leadership during the migration (like from a timeout). 
   
   The logic for detecting and aborting partial transactions is in 
ActivationRecordsGenerator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16007 Merge batch records during ZK migration [kafka]

2023-12-15 Thread via GitHub


mumrah commented on code in PR #15007:
URL: https://github.com/apache/kafka/pull/15007#discussion_r1427975054


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -645,6 +648,29 @@ public void run() throws Exception {
 }
 }
 
+private BufferingBatchConsumer 
buildMigrationBatchConsumer(
+MigrationManifest.Builder manifestBuilder
+) {
+return new BufferingBatchConsumer<>(batch -> {
+try {
+if (log.isTraceEnabled()) {
+batch.forEach(apiMessageAndVersion ->
+
log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message(;
+}
+CompletableFuture future = 
zkRecordConsumer.acceptBatch(batch);
+long batchStart = time.nanoseconds();
+FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",

Review Comment:
   Thanks, the Flow API looks really cool. I'll check that out. Does look like 
it's Java 9+ only, but I'll keep it in mind for future stuff (I think we'll be 
bumping up to Java 11 for 4.0)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16007 Merge batch records during ZK migration [kafka]

2023-12-15 Thread via GitHub


mumrah commented on code in PR #15007:
URL: https://github.com/apache/kafka/pull/15007#discussion_r1427972960


##
metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * A record batch consumer that merges incoming batches into batches of a 
minimum a given size. It does so
+ * by buffering the records into an array that is later flushed to a 
downstream consumer. Batches consumed
+ * by this class will not be broken apart, only combined with other batches to 
reach the minimum batch size.
+ * 
+ * Note that {@link #close()} must be called after the last batch has been 
accepted in order to flush any
+ * buffered records.
+ */
+public class BufferingBatchConsumer implements Consumer> {
+
+private final Consumer> delegateConsumer;
+private final List bufferedBatch;
+private final int minBatchSize;
+
+BufferingBatchConsumer(Consumer> delegateConsumer, int 
minBatchSize) {
+this.delegateConsumer = delegateConsumer;
+this.bufferedBatch = new ArrayList<>(minBatchSize);
+this.minBatchSize = minBatchSize;
+}
+
+@Override
+public void accept(List batch) {
+bufferedBatch.addAll(batch);
+if (bufferedBatch.size() >= minBatchSize) {
+delegateConsumer.accept(new ArrayList<>(bufferedBatch));
+bufferedBatch.clear();
+}
+}
+
+public void close() {

Review Comment:
   Yea, good point. I'll rename this to flush



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15971) Re-enable consumer integration tests for new consumer

2023-12-15 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15971.
-
Resolution: Fixed

> Re-enable consumer integration tests for new consumer
> -
>
> Key: KAFKA-15971
> URL: https://issues.apache.org/jira/browse/KAFKA-15971
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.7.0
>
>
> Re-enable the consumer integration tests for the new consumer making sure 
> that build stability is not impacted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15971: Re-enable consumer integration tests for new consumer [kafka]

2023-12-15 Thread via GitHub


dajac merged PR #14925:
URL: https://github.com/apache/kafka/pull/14925


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Minor Cleanup KRaft code [kafka]

2023-12-15 Thread via GitHub


jlprat commented on PR #15002:
URL: https://github.com/apache/kafka/pull/15002#issuecomment-1857846340

   Failures in CI seem unrelated to the change and they pass locally.
   
   Cherry-picked the change to `3.7` as well (cc @stanislavkozlovski).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Minor Cleanup KRaft code [kafka]

2023-12-15 Thread via GitHub


jlprat merged PR #15002:
URL: https://github.com/apache/kafka/pull/15002


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-15 Thread via GitHub


kamalcph commented on PR #15005:
URL: https://github.com/apache/kafka/pull/15005#issuecomment-1857818458

   I'll provide a review today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]

2023-12-15 Thread via GitHub


MikeEdgar commented on PR #14599:
URL: https://github.com/apache/kafka/pull/14599#issuecomment-1857793048

   @jolshan , I've updated the test and it's passing, but the CI is failing on 
a seemingly unrelated test.
   
   ```
   Gradle Test Run :storage:test > Gradle Test Executor 76 > 
TransactionsWithTieredStoreTest > testCommitTransactionTimeout(String) > 
testCommitTransactionTimeout(String).quorum=kraft FAILED
   
   org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
3000ms while awaiting InitProducerId
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-15 Thread via GitHub


showuon commented on PR #15005:
URL: https://github.com/apache/kafka/pull/15005#issuecomment-1857734261

   @clolov , there are checkstyle error. Please fix them: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15005/7/pipeline
 . Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16013: impelement ExpiresPerSec for remote fetch [kafka]

2023-12-15 Thread via GitHub


showuon commented on PR #15015:
URL: https://github.com/apache/kafka/pull/15015#issuecomment-1857731719

   @satishd , addressed your comment in this PR. Thanks.
   https://github.com/apache/kafka/pull/15014#discussion_r1426937685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16013: Add metric for expiration rate of delayed remote fetch [kafka]

2023-12-15 Thread via GitHub


showuon merged PR #15014:
URL: https://github.com/apache/kafka/pull/15014


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16013: Add metric for expiration rate of delayed remote fetch [kafka]

2023-12-15 Thread via GitHub


showuon commented on PR #15014:
URL: https://github.com/apache/kafka/pull/15014#issuecomment-1857712774

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Description: 
Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after increasing these values  I got hte 
same result, timeouts were happening, like the new values were not having any 
effect. 

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validate the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)
      } {code}
 

I tested it and now I am able to modify the timeouts and make my application to 
catch the result of the command properly.

 

  was:
Using the 

Re: [PR] KAFKA-15945 : Flaky test - testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest [kafka]

2023-12-15 Thread via GitHub


OmniaGM commented on code in PR #14893:
URL: https://github.com/apache/kafka/pull/14893#discussion_r1427819819


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -920,13 +923,13 @@ String remoteTopicName(String topic, String clusterAlias) 
{
  * launch the connectors on kafka connect cluster and check if they are 
running
  */
 protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster 
connectCluster,
-List> connectorClasses, 
MirrorMakerConfig mm2Config, 
+List> connectorClasses, 
MirrorMakerConfig mm2Config,
 String primary, String backup) throws InterruptedException {
 for (Class connector : connectorClasses) {
 connectCluster.configureConnector(connector.getSimpleName(), 
mm2Config.connectorBaseConfig(
 new SourceAndTarget(primary, backup), connector));
 }
-

Review Comment:
   Would be nice to get ripped off these reformatting new lines changes as they 
are unnecessary 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-15 Thread via GitHub


clolov commented on code in PR #15005:
URL: https://github.com/apache/kafka/pull/15005#discussion_r1427814012


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1073,13 +1088,29 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
 .iterator();
 while (epochsToClean.hasNext()) {
 int epoch = epochsToClean.next();
+List listOfSegmentsToBeCleaned = 
new ArrayList<>();
+sizeOfDeletableSegmentsBytes = 0L;

Review Comment:
   Yup, to paraphrase - we could have leftover undeleted segments and we need 
to carry them forward in the calculations. I have amended this in subsequent 
commits!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16016) Migrate utility scripts to kafka codebase

2023-12-15 Thread Vedarth Sharma (Jira)
Vedarth Sharma created KAFKA-16016:
--

 Summary: Migrate utility scripts to kafka codebase
 Key: KAFKA-16016
 URL: https://issues.apache.org/jira/browse/KAFKA-16016
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Vedarth Sharma
Assignee: Vedarth Sharma


Migrate the logic implemented in golang to kafka codebase by creating a new 
entrypoint for docker images



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16014: add RemoteLogSizeComputationTime metric [kafka]

2023-12-15 Thread via GitHub


clolov commented on code in PR #15021:
URL: https://github.com/apache/kafka/pull/15021#discussion_r1427799678


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -408,6 +409,17 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
 
   def remoteCopyBytesLag: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()
 
+  def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): 
Unit = {
+val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
+brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent)
+  }
+
+  def removeRemoteLogSizeComputationTime(partition: Int): Unit = {
+val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
+brokerTopicAggregatedMetric.removePartition(partition)
+  }
+
+  def remoteLogSizeComputationTime: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric.value()

Review Comment:
   ```suggestion
 def remoteLogSizeComputationTime: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric.value()
 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Affects Version/s: 3.5.1

> kafka-leader-election timeout values always overwritten by default values 
> --
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 3.5.1, 3.6.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: 
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
> request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: 
> The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlModelTrainingSamples-18: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlPartitionMetricSamples-8: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always 
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 
> seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> --                                  ---
> --admin.config     Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  
> and *default.api.timeout.ms.* So even after these changes the timeouts were 
> raising after 15 seconds.
> So I checked the source code and I came across with a bug, no matter the 
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[3.6 
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic 
> partitions. The type of elections supported are preferred replicas and 
> unclean replicas."
>     )    validate(commandOptions)    val electionType = 
> commandOptions.options.valueOf(commandOptions.electionType)    val 
> jsonFileTopicPartitions = 
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
> path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new 
> TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this 
> to be None if it is use.
>      * The validate function should be checking that this option is required 
> if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = 
> jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = 
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map 
> { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
> timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> (timeout.toMillis / 2).toString)      Admin.create(props)
>     } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is 
> 30/2 which validate the 15 seconds timeout.
> Also we can see in the code how the custom values passed by the config file 
> are overwritten by the defaults.
>  
>  
> The proposal is easy, we need to use the defaults values only when the 
> timeouts were not 

[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Summary: kafka-leader-election timeout values always overwritten by default 
values   (was: kafka-leader-election timeout values always overwritten b 
default values )

> kafka-leader-election timeout values always overwritten by default values 
> --
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 3.6.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: 
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
> request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: 
> The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlModelTrainingSamples-18: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlPartitionMetricSamples-8: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always 
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 
> seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> --                                  ---
> --admin.config     Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  
> and *default.api.timeout.ms.* So even after these changes the timeouts were 
> raising after 15 seconds.
> So I checked the source code and I came across with a bug, no matter the 
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[3.6 
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic 
> partitions. The type of elections supported are preferred replicas and 
> unclean replicas."
>     )    validate(commandOptions)    val electionType = 
> commandOptions.options.valueOf(commandOptions.electionType)    val 
> jsonFileTopicPartitions = 
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
> path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new 
> TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this 
> to be None if it is use.
>      * The validate function should be checking that this option is required 
> if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = 
> jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = 
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map 
> { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
> timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> (timeout.toMillis / 2).toString)      Admin.create(props)
>     } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is 
> 30/2 which validate the 15 seconds timeout.
> Also we can see in the code how the custom values passed by the config file 
> are 

[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten b default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Description: 
Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after these changes the timeouts were 
raising after 15 seconds.

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validate the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)
      } {code}
 

I tested it and now I am able to modify the timeouts and make my application to 
catch the result of the command properly.

 

  was:
Using the *kafka-leader-election.sh* I was getting random timeouts like these:

[jira] [Commented] (KAFKA-16015) kafka-leader-election timeout values always overwritten b default values

2023-12-15 Thread Sergio Troiano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797113#comment-17797113
 ] 

Sergio Troiano commented on KAFKA-16015:


hi [~pprovenzano] , I will open a PR in the 3.6 branch and also in the trunk 
one as I see the bug is in place, one detail I see is in the trunk the 
*LeaderElectionCommand.scala* was rewritten in [java 
here|https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java#L102]
 . If you agree I can send the PR for both branches 

> kafka-leader-election timeout values always overwritten b default values 
> -
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 3.6.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: 
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
> request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: 
> The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlModelTrainingSamples-18: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlPartitionMetricSamples-8: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always 
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 
> seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> --                                  ---
> --admin.config     Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  
> and *default.api.timeout.ms.* So even after these changes the timeouts were 
> raising after 15 seconds.
> So I checked the source code and I came across with a bug, no matter the 
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[ 3.6 
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic 
> partitions. The type of elections supported are preferred replicas and 
> unclean replicas."
>     )    validate(commandOptions)    val electionType = 
> commandOptions.options.valueOf(commandOptions.electionType)    val 
> jsonFileTopicPartitions = 
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
> path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new 
> TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this 
> to be None if it is use.
>      * The validate function should be checking that this option is required 
> if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = 
> jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = 
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map 
> { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
> timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> (timeout.toMillis / 2).toString)  

[jira] [Created] (KAFKA-16015) kafka-leader-election timeout values always overwritten b default values

2023-12-15 Thread Sergio Troiano (Jira)
Sergio Troiano created KAFKA-16015:
--

 Summary: kafka-leader-election timeout values always overwritten b 
default values 
 Key: KAFKA-16015
 URL: https://issues.apache.org/jira/browse/KAFKA-16015
 Project: Kafka
  Issue Type: Bug
  Components: admin, tools
Affects Versions: 3.6.1
Reporter: Sergio Troiano
Assignee: Sergio Troiano


Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after these changes the timeouts were 
raising after 15 seconds.

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[ 3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validate the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis 

Re: [PR] KAFKA-16014: add RemoteLogSizeComputationTime metric [kafka]

2023-12-15 Thread via GitHub


showuon commented on PR #15021:
URL: https://github.com/apache/kafka/pull/15021#issuecomment-1857570946

   @clolov @satishd , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16014: add RemoteLogSizeComputationTime metric [kafka]

2023-12-15 Thread via GitHub


showuon opened a new pull request, #15021:
URL: https://github.com/apache/kafka/pull/15021

   Implementing `RemoteLogSizeComputationTime` metric in KIP-963.
   
   Waiting for https://github.com/apache/kafka/pull/15005 merged to add this 
metric into `removeRemoteTopicPartitionMetrics` method.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15971: Re-enable consumer integration tests for new consumer [kafka]

2023-12-15 Thread via GitHub


dajac commented on PR #14925:
URL: https://github.com/apache/kafka/pull/14925#issuecomment-1857510091

   The last few builds failed due to infrastructure issues but we got a "good" 
one here: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14925/10/tests.
 Based on this, I will merge it to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15870: Move new group coordinator metrics from Yammer to Metrics [kafka]

2023-12-15 Thread via GitHub


dajac commented on PR #14848:
URL: https://github.com/apache/kafka/pull/14848#issuecomment-1857508292

   Triggered a new build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-15 Thread via GitHub


dajac commented on PR #14640:
URL: https://github.com/apache/kafka/pull/14640#issuecomment-1857500665

   The failed tests are not related. Merged it to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >