Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-05 Thread via GitHub


satishd commented on PR #15817:
URL: https://github.com/apache/kafka/pull/15817#issuecomment-2095255793

   @abhijeetk88 for reviewing the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-05 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,19 +90,176 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+UnifiedLog unifiedLog;
+if (useFutureLog) {
+unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+} else {
+unifiedLog = replicaMgr.localLogOrException(topicPartition);
+}
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset(), unifiedLog);
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+}
+
+private OffsetForLeaderEpochResponseData.EpochEndOffset 
fetchEarlierEpochEndOffset(Integer epoch,
+   
TopicPartition partition,
+   
Integer currentLeaderEpoch) {
+int previousEpoch = epoch - 1;
+
+// Find the end-offset for the epoch earlier to the given epoch from 
the leader
+Map partitionsWithEpochs 
= new HashMap<>();
+partitionsWithEpochs.put(partition, new 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+Option 
maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+if (maybeEpochEndOffset.isEmpty()) {
+throw new KafkaException("No response received for partition: " + 
partition);
+}
+
+OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = 
maybeEpochEndOffset.get();
+if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+throw Errors.forCode(epochEndOffset.errorCode()).exception();
+}
+
+return epochEndOffset;
+}
+
+private List readLeaderEpochCheckpoint(RemoteLogManager rlm,
+   
RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, 
RemoteStorageException {
+InputStream inputStream = 
rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.LEADER_EPOCH);
+try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+CheckpointFile.CheckpointReadBuffer readBuffer = new 
CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, 
LeaderEpochCheckpointFile.FORMATTER);
+return readBuffer.read();
+}
+}
+
+private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+   long nextOffset,
+   RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+   RemoteLogManager rlm) throws 
IOException, RemoteStorageException {
+// Restore producer snapshot
+File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + 
".tmp");
+// Copy it to snapshot file in atomic mann

Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-05 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,19 +90,176 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+UnifiedLog unifiedLog;
+if (useFutureLog) {
+unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+} else {
+unifiedLog = replicaMgr.localLogOrException(topicPartition);
+}
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset(), unifiedLog);
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+}
+
+private OffsetForLeaderEpochResponseData.EpochEndOffset 
fetchEarlierEpochEndOffset(Integer epoch,
+   
TopicPartition partition,
+   
Integer currentLeaderEpoch) {
+int previousEpoch = epoch - 1;
+
+// Find the end-offset for the epoch earlier to the given epoch from 
the leader
+Map partitionsWithEpochs 
= new HashMap<>();
+partitionsWithEpochs.put(partition, new 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+Option 
maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+if (maybeEpochEndOffset.isEmpty()) {
+throw new KafkaException("No response received for partition: " + 
partition);
+}
+
+OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = 
maybeEpochEndOffset.get();
+if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+throw Errors.forCode(epochEndOffset.errorCode()).exception();
+}
+
+return epochEndOffset;
+}
+
+private List readLeaderEpochCheckpoint(RemoteLogManager rlm,
+   
RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, 
RemoteStorageException {
+InputStream inputStream = 
rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.LEADER_EPOCH);
+try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+CheckpointFile.CheckpointReadBuffer readBuffer = new 
CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, 
LeaderEpochCheckpointFile.FORMATTER);
+return readBuffer.read();
+}
+}
+
+private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+   long nextOffset,
+   RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+   RemoteLogManager rlm) throws 
IOException, RemoteStorageException {
+// Restore producer snapshot
+File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + 
".tmp");
+// Copy it to snapshot file in atomic mann

Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-05 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590567527


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1858,16 +1858,6 @@ class KafkaConfigTest {
 }
   }
 
-  @Test
-  def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = {

Review Comment:
   Can we add a positive test for mulitple log dirs also like the one created 
below for a single dir, may be refactor the below test to cover for both 
scenarios?



##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,19 +90,176 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+UnifiedLog unifiedLog;
+if (useFutureLog) {
+unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+} else {
+unifiedLog = replicaMgr.localLogOrException(topicPartition);
+}
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset(), unifiedLog);
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+}
+
+private OffsetForLeaderEpochResponseData.EpochEndOffset 
fetchEarlierEpochEndOffset(Integer epoch,
+   
TopicPartition partition,
+   
Integer currentLeaderEpoch) {
+int previousEpoch = epoch - 1;
+
+// Find the end-offset for the epoch earlier to the given epoch from 
the leader
+Map partitionsWithEpochs 
= new HashMap<>();
+partitionsWithEpochs.put(partition, new 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+Option 
maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+if (maybeEpochEndOffset.isEmpty()) {
+throw new KafkaException("No response received for partition: " + 
partition);
+}
+
+OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = 
maybeEpochEndOffset.get();
+if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+throw Errors.forCode(epochEndOffset.errorCode()).exception();
+}
+
+return epochEndOffset;
+}
+
+private List readLeaderEpochCheckpoint(RemoteLogManager rlm,
+   
RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, 
RemoteStorageException {
+InputStream inputStream = 
rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.LEADER_EPOCH);
+try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+CheckpointFile.CheckpointReadBuffer readBuffer = new 
CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, 
LeaderEpochCheckpointFile.FORMATTER);
+return readBuffer.read();
+}
+}
+
+private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+   long nextOffset,
+   RemoteLogSegmentMe

Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1590547523


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -1073,4 +1151,13 @@ class ZkMigrationIntegrationTest {
 kraftCluster.close()
 zkCluster.stop()
   }
+
+  def maybeRetry(shouldRetry: Boolean, maxWaitMs: Long)(block: => Unit): Unit 
= {

Review Comment:
   It seems we don't set `shouldRetry` to `false` in this test. Maybe this 
method is unnecessary?



##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersionOpt: Option[Int] = if 
(!enableEntityConfigNoController) {

Review Comment:
   As this flag is used to guard against 1) no controller and 2) kraft 
controller, the naming enableEntityConfig`NoController` is a bit unsuitable. 
How about `enableEntityConfigCheck`?



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest {
 admin.alterUserScramCredentials(alterations)
   }
 
-  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
+  def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit 
= {
+maybeRetry(shouldRetry, 1) {
   val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test")
   assertEquals("204800", 
propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
   assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
 }
   }
 
-  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
-TestUtils.retry(1) {
-  assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, 
Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
-  assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, 
"").getProperty("consumer_byte_rate"))
-  assertEquals("800", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("consumer_byte_rate"))
-  assertEquals("100", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("producer_byte_rate"))
-  assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, 
"8.8.8.8").getProperty("connection_creation_rate"))
+  def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit 
= {
+maybeRetry(shouldRetry, 1) {
+  val defaultBrokerProps = zkClient.getEntityConfigs(ConfigType.BROKER, 
"")
+  assertEquals("8640", 
defaultBrokerProps.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+
+  val broker0Props = zkClient.getEntityConfigs(ConfigType.BROKER, "0")
+  assertEquals("4320", 
broker0Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
+
+  val broker1Props = zkClient.getEntityConfigs(ConfigType.BROKER, "1")
+  assertEquals("4320", 
broker1Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG))
 }
   }
 
+  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {

Review Comment:
   why removing the retry? It seems that make the `testDualWriteQuotaAndScram` 
unstable



##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersionOpt: Option[Int] = if 
(!enableEntityConfigNoController) {
+  val controllerRegistration = getControllerRegistration match {
+case Some(registration) => registration
+case None =>
+  // This case is mainly here to make tests less flaky (by virtue of 
retries).
+  // In practice, we always expect a /controller ZNode to exist
+  throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+  }
+
+  // If there is a KRaft controller defined, don't even attempt this 
write. The broker will soon get a UMR
+  // from the new KRaft controller that lets it know about the new 
control

Re: [PR] KAFKA-15845: Detect leaked Kafka clients and servers with LeakTestingExtension [kafka]

2024-05-05 Thread via GitHub


github-actions[bot] commented on PR #14783:
URL: https://github.com/apache/kafka/pull/14783#issuecomment-2095133166

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15524, KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]

2024-05-05 Thread via GitHub


github-actions[bot] commented on PR #15302:
URL: https://github.com/apache/kafka/pull/15302#issuecomment-2095133015

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15722: KRaft support in RackAwareAutoTopicCreationTest [kafka]

2024-05-05 Thread via GitHub


github-actions[bot] commented on PR #15318:
URL: https://github.com/apache/kafka/pull/15318#issuecomment-2095133001

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-05 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16670:
-
Description: 
*Related Code*
 * Consumer get assignment Successfully :
 ** 
[https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
 * Consumer get be stuck Forever because of concurrent issue:
 ** 
[https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]

 

*Unexpected behaviour*
 * 
Broker is sufficiently slow.
 * When a KafkaConsumer is created and immediately subscribes to a topic

If both conditions are met, {{Consumer}} can potentially never receive 
{{TopicPartition}} assignments and become stuck indefinitely.

In case of new broker and new consumer, when consumer are created, consumer 
background thread send a request to broker. (I guess groupCoordinator Heartbeat 
request). In that time, if broker does not load metadata from 
{{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
broker load metadata completely, consumer background thread think 'this broker 
is valid group coordinator'.

However, consumer can send {{subscribe}} request to broker before {{broker}} 
reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, consumer 
seems to be stuck.

If both conditions are met, the {{Consumer}} can potentially never receive 
{{TopicPartition}} assignments and may become indefinitely stuck. In the case 
of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
{{consumer background thread}} start to send a request to the broker. (I 
believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this time, 
if the {{broker}} has not yet loaded metadata from {{{}__consumer_offsets{}}}, 
it will begin to schedule metadata loading. Once the broker has completely 
loaded the metadata, the {{consumer background thread}} recognizes this broker 
as a valid group coordinator. However, there is a possibility that the 
{{consumer}} can send a {{subscribe request}} to the {{broker}} before the 
{{broker}} has replied to the {{{}GroupCoordinator Heartbeat Request{}}}. In 
such a scenario, the {{consumer}} appears to be stuck.

 

You can check this scenario, in the 
{{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}.
 If there is no sleep time to wait {{{}GroupCoordinator Heartbeat Request{}}}, 
{{consumer}} will be always stuck. If there is a little sleep time, 
{{consumer}} will always receive assignment.

 

README : 
[https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md]

 

In my case, consumer get assignment in `docker-compose` : it means not enough 
slow. 

However, consumer cannot get assignmet in `testcontainers` without little 
waiting time. : it means enough slow to cause concurrent issue. 

`testconatiners` is docker in docker, thus `testcontainers` will be slower than 
`docker-compose`. 

  was:
*Related Code*
 * Consumer get assignment Successfully :
 ** 
[https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
 * Consumer get be stuck Forever because of concurrent issue:
 ** 
https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79

 

*Unexpected behaviour*
[|https://github.com/chickenchickenlove/new-consumer-error#unexpected-behaviour]
 * Broker is sufficiently slow.
 * When a KafkaConsumer is created and immediately subscribes to a topic

If both conditions are met, {{Consumer}} can potentially never receive 
{{TopicPartition}} assignments and become stuck indefinitely.

In case of new broker and new consumer, when consumer are created, consumer 
background thread send a request to broker. (I guess groupCoordinator Heartbeat 
request). In that time, if broker does not load metadata from 
{{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
broker load metadata completely, consumer background thread think 'this broker 
is valid group coordinator'.

However, consumer can send {{subscribe}} request to broker before {{broker}} 
reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, consumer 
seems to be stuck.

If both conditions are met, the {{Consumer}} can potentially never receive 
{{TopicPartition}} assignments and may become indefinitely stuck. In the case 
of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
{{consumer background thread}} start to send a request to the broker. (I 
believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this time, 
if the {{broker}} has not yet loaded met

[jira] [Created] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-05 Thread sanghyeok An (Jira)
sanghyeok An created KAFKA-16670:


 Summary: KIP-848 : Consumer will not receive assignment forever 
because of concurrent issue.
 Key: KAFKA-16670
 URL: https://issues.apache.org/jira/browse/KAFKA-16670
 Project: Kafka
  Issue Type: Bug
Reporter: sanghyeok An


*Related Code*
 * Consumer get assignment Successfully :
 ** 
[https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
 * Consumer get be stuck Forever because of concurrent issue:
 ** 
https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79

 

*Unexpected behaviour*
[|https://github.com/chickenchickenlove/new-consumer-error#unexpected-behaviour]
 * Broker is sufficiently slow.
 * When a KafkaConsumer is created and immediately subscribes to a topic

If both conditions are met, {{Consumer}} can potentially never receive 
{{TopicPartition}} assignments and become stuck indefinitely.

In case of new broker and new consumer, when consumer are created, consumer 
background thread send a request to broker. (I guess groupCoordinator Heartbeat 
request). In that time, if broker does not load metadata from 
{{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
broker load metadata completely, consumer background thread think 'this broker 
is valid group coordinator'.

However, consumer can send {{subscribe}} request to broker before {{broker}} 
reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, consumer 
seems to be stuck.

If both conditions are met, the {{Consumer}} can potentially never receive 
{{TopicPartition}} assignments and may become indefinitely stuck. In the case 
of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
{{consumer background thread}} start to send a request to the broker. (I 
believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this time, 
if the {{broker}} has not yet loaded metadata from {{{}__consumer_offsets{}}}, 
it will begin to schedule metadata loading. Once the broker has completely 
loaded the metadata, the {{consumer background thread}} recognizes this broker 
as a valid group coordinator. However, there is a possibility that the 
{{consumer}} can send a {{subscribe request}} to the {{broker}} before the 
{{broker}} has replied to the {{{}GroupCoordinator Heartbeat Request{}}}. In 
such a scenario, the {{consumer}} appears to be stuck.

You can check this scenario, in the 
{{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}.
 If there is no sleep time to wait {{{}GroupCoordinator Heartbeat Request{}}}, 
{{consumer}} will be always stuck. If there is a little sleep time, 
{{consumer}} will always receive assignment.

 

README : 
https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-05-05 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843599#comment-17843599
 ] 

PoAn Yang commented on KAFKA-16223:
---

Yes, I will try to send a PR ASAP. Thanks.

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-05-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843596#comment-17843596
 ] 

Chia-Ping Tsai commented on KAFKA-16223:


[~yangpoan] Could you please take over remaining migration? 

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) [kafka]

2024-05-05 Thread via GitHub


chia7712 merged PR #15841:
URL: https://github.com/apache/kafka/pull/15841


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16659) KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER

2024-05-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16659.

Fix Version/s: 3.8.0
   Resolution: Fixed

> KafkaConsumer#position() does not respect wakup when group protocol is 
> CONSUMER
> ---
>
> Key: KAFKA-16659
> URL: https://issues.apache.org/jira/browse/KAFKA-16659
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> see following test
> {code:scala}
>   @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
>   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
>   @Timeout(15)
>   def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit 
> = {
> val topicPartition = new TopicPartition(topic, 15)
> val consumer = createConsumer()
> consumer.assign(List(topicPartition).asJava)
> CompletableFuture.runAsync { () =>
>   TimeUnit.SECONDS.sleep(1)
>   consumer.wakeup()
> }
> assertThrows(classOf[WakeupException], () => 
> consumer.position(topicPartition, Duration.ofSeconds(3)))
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]

2024-05-05 Thread via GitHub


chia7712 merged PR #15853:
URL: https://github.com/apache/kafka/pull/15853


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on PR #15853:
URL: https://github.com/apache/kafka/pull/15853#issuecomment-2095025582

   ```
   ./gradlew cleanTest :connect:runtime:test --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
 :metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers 
:trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated 
:connect:mirror:test --tests 
MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow
 --tests 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault
 :core:test --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeWithWildcardAcls
 --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
UserClientIdQuotaTest.testThrottledProducerConsumer --tests 
UserClientIdQuotaTest.testQuotaOverrideDelete --tests 
ReplicationQuotasTest.shouldThrottleOldSegments
   
   ```
   unrelated error and they pass on my local.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16393 read/write sequence of buffers correctly [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on PR #15571:
URL: https://github.com/apache/kafka/pull/15571#issuecomment-2095022237

   I have trigged QA again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590457683


##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -44,4 +44,5 @@
 String listener() default "";
 MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0;
 ClusterConfigProperty[] serverProperties() default {};
+Tags[] tags() default {};

Review Comment:
   Another idea. Maybe we don't need to have new annotation. We just have a new 
field `String[] labels() default {}` to enable developers to add custom strings 
to display name. That is more simple. WDYT? 



##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -44,4 +44,5 @@
 String listener() default "";
 MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0;
 ClusterConfigProperty[] serverProperties() default {};
+Tags[] tags() default {};

Review Comment:
   Could you please add comment for it



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -143,23 +144,27 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 Type type = annot.clusterType() == Type.DEFAULT ? 
defaults.clusterType() : annot.clusterType();
 
 Map serverProperties = new HashMap<>();
+Map tags = new HashMap<>();
 for (ClusterConfigProperty property : defaults.serverProperties()) {
 serverProperties.put(property.key(), property.value());
 }
 for (ClusterConfigProperty property : annot.serverProperties()) {
 serverProperties.put(property.key(), property.value());
 }
+for (Tags tag: annot.tags()) {
+tags.put(tag.key(), tag.value());
+}
 ClusterConfig config = ClusterConfig.builder()
 .setType(type)
 .setBrokers(annot.brokers() == 0 ? defaults.brokers() : 
annot.brokers())
 .setControllers(annot.controllers() == 0 ? 
defaults.controllers() : annot.controllers())
 .setDisksPerBroker(annot.disksPerBroker() == 0 ? 
defaults.disksPerBroker() : annot.disksPerBroker())
 .setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? 
defaults.autoStart() : annot.autoStart() == AutoStart.YES)
-.setName(annot.name().trim().isEmpty() ? null : annot.name())
 .setListenerName(annot.listener().trim().isEmpty() ? null : 
annot.listener())
 .setServerProperties(serverProperties)
 .setSecurityProtocol(annot.securityProtocol())
 .setMetadataVersion(annot.metadataVersion())
+.setTags(tags)

Review Comment:
   Maybe lambda is more simple. For example: 
`setTags(Arrays.stream(annot.tags()).collect(Collectors.toMap(Tags::key, 
Tags::value)))`



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -83,6 +81,7 @@ private ClusterConfig(Type type, int brokers, int 
controllers, int disksPerBroke
 this.saslServerProperties = 
Objects.requireNonNull(saslServerProperties);
 this.saslClientProperties = 
Objects.requireNonNull(saslClientProperties);
 this.perBrokerOverrideProperties = 
Objects.requireNonNull(perBrokerOverrideProperties);
+this.tags = Objects.requireNonNull(extendTags(tags));

Review Comment:
   We should keep the origin tags and return the "modified" tags in `nameTags`. 
Also, please add a method (`tags`) to return origin tags.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1590455114


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig,
 cleanerManager.removeMetrics()
   }
 
+  /**
+   * Activate metrics
+   */
+  def activateMetrics():Unit = {
+metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,

Review Comment:
   agree. @chiacyu Could you Please avoid producing duplicate code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15862:
URL: https://github.com/apache/kafka/pull/15862#discussion_r1590454147


##
core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java:
##
@@ -33,16 +31,22 @@ public class ClusterTestExtensionsUnitTest {
 void testProcessClusterTemplate() {
 ClusterTestExtensions ext = new ClusterTestExtensions();
 ExtensionContext context = mock(ExtensionContext.class);
-Consumer testInvocations = 
mock(Consumer.class);
 ClusterTemplate annot = mock(ClusterTemplate.class);
-when(annot.value()).thenReturn("").thenReturn(" ");
+when(annot.value()).thenReturn("").thenReturn(" 
").thenReturn("test_empty_config");

Review Comment:
   That is a good way. Also, we can do the refactor in 
https://issues.apache.org/jira/browse/KAFKA-16654 later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]

2024-05-05 Thread via GitHub


soarez commented on code in PR #15862:
URL: https://github.com/apache/kafka/pull/15862#discussion_r1590420247


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -119,23 +116,34 @@ public Stream 
provideTestTemplateInvocationContex
 return generatedContexts.stream();
 }
 
-void processClusterTemplate(ExtensionContext context, ClusterTemplate 
annot,
-
Consumer testInvocations) {
+List 
processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
 // If specified, call cluster config generated method (must be static)
 List generatedClusterConfigs = new ArrayList<>();
+List testTemplateInvocationContexts = 
new ArrayList<>();
 if (annot.value().trim().isEmpty()) {
 throw new IllegalStateException("ClusterTemplate value can't be 
empty string.");
 }
 generateClusterConfigurations(context, annot.value(), 
generatedClusterConfigs::add);
 
-String baseDisplayName = context.getRequiredTestMethod().getName();
-generatedClusterConfigs.forEach(config -> 
config.clusterType().invocationContexts(baseDisplayName, config, 
testInvocations));
+if (context.getRequiredTestMethod() != null) {

Review Comment:
   We shouldn't need to check for null here. Can you remove the condition?
   
   
https://github.com/junit-team/junit5/blob/releases/5.10.x/junit-jupiter-api/src/main/java/org/junit/jupiter/api/extension/ExtensionContext.java#L236



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -119,23 +116,34 @@ public Stream 
provideTestTemplateInvocationContex
 return generatedContexts.stream();
 }
 
-void processClusterTemplate(ExtensionContext context, ClusterTemplate 
annot,
-
Consumer testInvocations) {
+List 
processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
 // If specified, call cluster config generated method (must be static)
 List generatedClusterConfigs = new ArrayList<>();
+List testTemplateInvocationContexts = 
new ArrayList<>();
 if (annot.value().trim().isEmpty()) {
 throw new IllegalStateException("ClusterTemplate value can't be 
empty string.");
 }
 generateClusterConfigurations(context, annot.value(), 
generatedClusterConfigs::add);
 
-String baseDisplayName = context.getRequiredTestMethod().getName();
-generatedClusterConfigs.forEach(config -> 
config.clusterType().invocationContexts(baseDisplayName, config, 
testInvocations));
+if (context.getRequiredTestMethod() != null) {
+String baseDisplayName = context.getRequiredTestMethod().getName();
+generatedClusterConfigs.forEach(config -> 
config.clusterType().invocationContexts(baseDisplayName, config, 
testTemplateInvocationContexts::add));
+}
+
+if (testTemplateInvocationContexts.isEmpty()) {
+throw new IllegalStateException("ClusterConfig generator method 
should provide at least one config.");
+}
+
+return testTemplateInvocationContexts;
 }
 
 private void generateClusterConfigurations(ExtensionContext context, 
String generateClustersMethods, ClusterGenerator generator) {
 Object testInstance = context.getTestInstance().orElse(null);
-Method method = 
ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), 
generateClustersMethods, ClusterGenerator.class);
-ReflectionUtils.invokeMethod(method, testInstance, generator);
+Class testClass = context.getRequiredTestClass();
+if (context.getRequiredTestClass() != null) {

Review Comment:
   Same here. 
   
https://github.com/junit-team/junit5/blob/releases/5.10.x/junit-jupiter-api/src/main/java/org/junit/jupiter/api/extension/ExtensionContext.java#L137
   
   Can you remove the `if`?



##
core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java:
##
@@ -33,16 +31,22 @@ public class ClusterTestExtensionsUnitTest {
 void testProcessClusterTemplate() {
 ClusterTestExtensions ext = new ClusterTestExtensions();
 ExtensionContext context = mock(ExtensionContext.class);
-Consumer testInvocations = 
mock(Consumer.class);
 ClusterTemplate annot = mock(ClusterTemplate.class);
-when(annot.value()).thenReturn("").thenReturn(" ");
+when(annot.value()).thenReturn("").thenReturn(" 
").thenReturn("test_empty_config");

Review Comment:
   I'm wondering if these tests would be simpler to write with actual methods. 
   
   We could define a static inner class:
   
   ```java
   static class StubTest {
   @ClusterTemplate("cfgFoo")
   void testFoo() {}
   static void cfgFoo(ClusterGenerator gen) { /* ... */ }
   
   @ClusterTemplate("")
   void testBar() {}
   }
   ```
   
   and a utility method:
   
   ```java

Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-05 Thread via GitHub


soarez commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1590415617


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   cleaners += cleaner
   cleaner.start()
 }
+activateMetrics();

Review Comment:
   Since there is no change to the existing definition of metrics, it seems 
this will cause the metrics to be initialized twice.



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig,
 cleanerManager.removeMetrics()
   }
 
+  /**
+   * Activate metrics
+   */
+  def activateMetrics():Unit = {
+metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
+  () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
+
+metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
+  val stats = cleaners.map(_.lastStats)
+  val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / 
math.max(stats.iterator.map(_.bytesRead).sum, 1)
+  (100 * recopyRate).toInt
+})
+
+metricsGroup.newGauge(MaxCleanTimeMetricName, () => 
maxOverCleanerThreads(_.lastStats.elapsedSecs))
+
+metricsGroup.newGauge(MaxCompactionDelayMetricsName,
+  () => 
maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
+
+metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)

Review Comment:
   If we're moving the metrics here, please keep the existing comments.



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   cleaners += cleaner
   cleaner.start()
 }
+activateMetrics();

Review Comment:
   +1 Please include a test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-05-05 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1590396620


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +474,42 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerZkVersion = if (!enableEntityConfigNoController) {

Review Comment:
   We still need to support `kafka-config.sh --zookeeper` until 4.0. 
Previously, it was possible to set configs with the tools even if the cluster 
wasn't running. This flag lets us configure the whole KafkaZkClient in 
CommandConfig to allow this behavior. Otherwise, it's disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]

2024-05-05 Thread via GitHub


Phuc-Hong-Tran commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-2094904127

   Fyi @JimmyWang6, I have taken over this ticket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1590370017


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,374 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-pu

Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]

2024-05-05 Thread via GitHub


JimmyWang6 commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-2094886153

   > hi @JimmyWang6, are you still working on this?
   
   @Phuc-Hong-Tran I apologize for the delay in finishing this issue. I will 
continue to work on this recently, and sorry for any inconvenience it may have 
caused.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-05-05 Thread via GitHub


ashmeet13 commented on code in PR #12988:
URL: https://github.com/apache/kafka/pull/12988#discussion_r1590353376


##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -530,6 +530,14 @@ public void 
shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
 assertThat(consumerConfigs.get("internal.leave.group.on.close"), 
is(false));
 }
 
+@Test
+public void 
shouldResetToDefaultIfConsumerAllowAutoCreateTopicsIsOverridden() {

Review Comment:
   Yes, will be making this test extensible for all the consumers. 
   Can you help me with how can I capture the log to verify WARN is being 
printed? I think it's a good-to-have test case. Any test that already does 
this? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-05-05 Thread via GitHub


ashmeet13 commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-2094870332

   Down are the configs for each type with whether - 
   1. Is it a custom default for KS but is editable by the user
   2. Or, is it a fixed value controlled by KS
   
   ### Producer Configs
   ```markdown
   
   EoS Disabled
   1. [Editable] [CustomDefault] linger.ms = 100
   
   4. [Fixed] partitioner.class = StreamsPartitioner
   
   EoS Enabled
   1. [Editable] [CustomDefault] linger.ms = 100
   2. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX
   5. [Editable] [CustomDefault] transaction.timeout.ms = 1
   
   6. [Fixed] partitioner.class = StreamsPartitioner
   7. [Fixed] enable.idempotence = true
   8. [Fixed] transactional.id = -
   
   7. [Validate] max.in.flight.requests.per.connection <= 5
   
   ```
   
   ### Main Consumer Configs 
   ```markdown
   EoS Disabled 
   1. [Editable][CustomDefault] auto.offset.reset = earliest
   2. [Editable] [CustomDefault] max.poll.records = 1000
   
   3. [Fixed] allow.auto.create.topics = false
   4. [Fixed] enable.auto.commit = false
   5. [Fixed] group.id = 
   
   EoS Enabled
   1. [Editable][CustomDefault] auto.offset.reset = earliest
   2. [Editable] [CustomDefault] max.poll.records = 1000
   
   3. [Fixed] allow.auto.create.topics = false
   4. [Fixed] enable.auto.commit = false
   5. [Fixed] group.id = 
   ```
   
   ### Global Consumer Configs 
   ```markdown
   EoS Disabled 
   1. [Editable] [CustomDefault] max.poll.records = 1000
   
   2. [Fixed] auto.offset.reset = None
   3. [Fixed] allow.auto.create.topics = false
   4. [Fixed] enable.auto.commit = false
   5. [Fixed] group.id = None
   
   EoS Enabled
   1. [Editable] [CustomDefault] max.poll.records = 1000
   
   2. [Fixed] auto.offset.reset = None
   3. [Fixed] allow.auto.create.topics = false
   4. [Fixed] enable.auto.commit = false
   5. [Fixed] group.id = None
   ```
   
   ### Restore Consumer Configs 
   ```markdown
   EoS Disabled 
   1. [Editable] [CustomDefault] max.poll.records = 1000
   
   2. [Fixed] auto.offset.reset = None
   3. [Fixed] allow.auto.create.topics = false
   4. [Fixed] enable.auto.commit = false
   5. [Fixed] group.id = None
   
   EoS Enabled
   1. [Editable] [CustomDefault] max.poll.records = 1000
   
   2. [Fixed] auto.offset.reset = None
   3. [Fixed] allow.auto.create.topics = false
   4. [Fixed] enable.auto.commit = false
   5. [Fixed] group.id = None
   ```
   
   There are a few more that are coded ad-hoc within the code that I haven't 
included. Seemed like a broader change for Streams Configs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-05-05 Thread via GitHub


ashmeet13 commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-2094869760

   Hi @mjsax, apologies for the extremely absent behavior on this PR. I have 
gone ahead and implemented the changes. The tests are pending and currently 
working on them. Detailing the implementation down.
   
   There are two pieces that KS controls -
   1. Custom Default - Configs that have custom default values for KS compared 
to the actual defaults. These values can also be overwritten by the user.
   2. Controlled Configs - Configs that are controlled by KS and cannot be 
overwritten by the user (We want to warn the user that this value is being 
overwritten if set by the user)
   
   ### Previous Implementation -
   1. We used to have the following data structures
   ```java
   String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS // Controlled KS Consumer 
Configs
   String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIG // Controlled KS Consumer 
Configs when EoS enabled
   
   String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS // Controlled KS Producer 
Config when EoS enabled
   
   Map PRODUCER_DEFAULT_OVERRIDES // Producer Custom Default + 
Controlled Config Values
   Map PRODUCER_EOS_OVERRIDES // Producer Custom Default + 
Controlled Config Values with EoS
   
   Map CONSUMER_DEFAULT_OVERRIDES // Consumer Custom Default + 
Controlled Config Values
   Map CONSUMER_EOS_OVERRIDES // Consumer Custom Default + 
Controlled Config Values with EoS
   ```
   
   2. The steps to return the required config broadly were:
   1. **Get client configs**: Gather client configurations with prefixes 
either `consumer.` or `producer.` and put them in `clientProvidedProps`.
   2. **Clean `clientProvidedProps`**: Use the method 
`checkIfUnexpectedUserSpecifiedConsumerConfig` to tidy up `clientProvidedProps`.
   3. **Create `props`**: Generate `props` using either 
`<>_DEFAULT_OVERRIDES` or `<>_EOS_OVERRIDES`.
   4. **Overwrite `props`**: Replace `props` with the cleaned 
`clientProvidedProps`.
   5. **Fetch additional configs (only for consumer props)**: If it's 
consumer props, fetch configurations set using `main.consumer.`, 
`global.consumer.`, or `restore.consumer.` and add them to the `props` map.
   
   3. After the initial setup, we make some tweaks based on whether it's for a 
consumer or producer, and then we hand back the props.
   
   
   ### Current Implementation -
   1. Give away with the old data structures and define the following new ones -
   ```java
   Map KS_DEFAULT_PRODUCER_CONFIGS // KS Custom Defaults for 
Producer
   Map KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED // KS Custom 
Defaults for Producer with EoS
   Map KS_CONTROLLED_PRODUCER_CONFIGS // KS Controlled Configs 
for Producer
   Map KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED // KS 
Controlled Configs for Producer with EoS
   
   Map KS_DEFAULT_CONSUMER_CONFIGS // KS Custom Defaults for 
Consumer
   Map KS_CONTROLLED_CONSUMER_CONFIGS // KS Controlled Configs 
for Consumer
   Map KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED // KS 
Controlled Configs for Consumer with EoS
   ```
   
   2. The steps to return the required config now are:
   1. **Get client configs**: Obtain client configurations with prefixes 
either `consumer.` or `producer.` and place them in `clientProvidedProps`.
   2. **Create `props`**: Generate `props` using either 
`KS_DEFAULT_<>_CONFIGS` or `KS_DEFAULT_<>_CONFIGS_EOS_ENABLED`.
   3. **Overwrite `props`**: Replace `props` with the cleaned 
`clientProvidedProps`.
   4. **Fetch additional configs (only for consumer props)**: If it's 
consumer props, fetch configurations set using `main.consumer.`, 
`global.consumer.`, or `restore.consumer.` and add them to the `props` map.
   5. **Run validation check over `props`**: Perform a validation check on 
`props`. This check will use `KS_CONTROLLED_<>_CONFIGS` or 
`KS_CONTROLLED_<>_CONFIGS_EOS_ENABLED` maps to see if the values are already 
set in `props`. If they are, log a warning and overwrite them. If not, add them 
to `props`.
   
   4. After the initial setup, we make some tweaks based on whether it's for a 
consumer or producer, and then we hand back the props.
   
   
   Below, I'll share the configurations organized into custom defaults and 
controlled configs for both consumers and producers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) [kafka]

2024-05-05 Thread via GitHub


hgeraldino commented on PR #15841:
URL: https://github.com/apache/kafka/pull/15841#issuecomment-2094865173

   Thanks @chia7712 for your comments. 
   
   This PR migrates another 13 test cases to Mockito, I expect the remaining 12 
cases to be covered in one last pull request


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16669) Remove extra collection copy when genrating DescribeAclsResource

2024-05-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16669:
--

Assignee: Chia Chuan Yu  (was: Chia-Ping Tsai)

> Remove extra collection copy when genrating DescribeAclsResource
> 
>
> Key: KAFKA-16669
> URL: https://issues.apache.org/jira/browse/KAFKA-16669
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia Chuan Yu
>Priority: Trivial
>
> There are three collection copy happening in generating DescribeAclsResource
> 1. Iterable -> HashSet 
> (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/core/src/main/scala/kafka/server/AclApis.scala#L72)
> 2. HashSet -> Map 
> (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L141)
> 3. Map -> List 
> (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L146)
> We can do two small optimization:
> 1. remove the first collection copy. This optimization needs two steps: a) 
> change `aclsResources` input type from `Collection` to `Iterable`. b) 
> de-duplicate in second collection copy: HashSet -> Map. We use `Set` 
> to replace the `List`
> 2. set the array size. 
> https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L148



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16669) Remove extra collection copy when genrating DescribeAclsResource

2024-05-05 Thread Chia Chuan Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843570#comment-17843570
 ] 

Chia Chuan Yu commented on KAFKA-16669:
---

Hi, [~chia7712] 

Can I have this one? Thanks!

> Remove extra collection copy when genrating DescribeAclsResource
> 
>
> Key: KAFKA-16669
> URL: https://issues.apache.org/jira/browse/KAFKA-16669
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Trivial
>
> There are three collection copy happening in generating DescribeAclsResource
> 1. Iterable -> HashSet 
> (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/core/src/main/scala/kafka/server/AclApis.scala#L72)
> 2. HashSet -> Map 
> (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L141)
> 3. Map -> List 
> (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L146)
> We can do two small optimization:
> 1. remove the first collection copy. This optimization needs two steps: a) 
> change `aclsResources` input type from `Collection` to `Iterable`. b) 
> de-duplicate in second collection copy: HashSet -> Map. We use `Set` 
> to replace the `List`
> 2. set the array size. 
> https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L148



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15865:
URL: https://github.com/apache/kafka/pull/15865#discussion_r1590318726


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java:
##
@@ -160,7 +155,7 @@ public KafkaStatusBackingStore(Time time, Converter 
converter, Supplier kafkaLog) {
-this(time, converter);
+this(time, converter, null, "connect-distributed-");

Review Comment:
   This constructor is used by testing. It set `topicAdminSupplier` to null, so 
we have to handle the "null" `topicAdminSupplier` just for testing. That is a 
bit awkward to me. Could we require those test cases pass a 
`topicAdminSupplier` instead of `null`? Those tests can pass a fake 
`topicAdminSupplier` to constructor if they expect `topicAdminSupplier` should 
not be called in testing.
   
   
https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java#L137
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14785: Connect offset read REST API [kafka]

2024-05-05 Thread via GitHub


yashmayya commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1590316538


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -141,24 +145,13 @@ private static String noClientId() {
 protected KafkaBasedLog offsetLog;
 // Visible for testing
 final HashMap data = new HashMap<>();
+private final Map>> connectorPartitions = 
new HashMap<>();
+private Converter keyConverter;
 private final Supplier topicAdminSupplier;
 private final Supplier clientIdBase;
 private SharedTopicAdmin ownTopicAdmin;
 protected boolean exactlyOnce;
 
-/**
- * Create an {@link OffsetBackingStore} backed by a Kafka topic. This 
constructor will cause the
- * store to instantiate and close its own {@link TopicAdmin} during {@link 
#configure(WorkerConfig)}
- * and {@link #stop()}, respectively.
- *
- * @deprecated use {@link #KafkaOffsetBackingStore(Supplier, Supplier)} 
instead
- */
-@Deprecated
-public KafkaOffsetBackingStore() {

Review Comment:
   @chia7712 I've raised this PR to remove the other two deprecated 
constructors - https://github.com/apache/kafka/pull/15865 (and also added some 
context in the PR on why deprecation was introduced rather than removal even 
though these classes are not in the public API).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16669) Remove extra collection copy when genrating DescribeAclsResource

2024-05-05 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16669:
--

 Summary: Remove extra collection copy when genrating 
DescribeAclsResource
 Key: KAFKA-16669
 URL: https://issues.apache.org/jira/browse/KAFKA-16669
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


There are three collection copy happening in generating DescribeAclsResource

1. Iterable -> HashSet 
(https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/core/src/main/scala/kafka/server/AclApis.scala#L72)

2. HashSet -> Map 
(https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L141)

3. Map -> List 
(https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L146)

We can do two small optimization:

1. remove the first collection copy. This optimization needs two steps: a) 
change `aclsResources` input type from `Collection` to `Iterable`. b) 
de-duplicate in second collection copy: HashSet -> Map. We use `Set` 
to replace the `List`

2. set the array size. 
https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L148




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes [kafka]

2024-05-05 Thread via GitHub


yashmayya commented on PR #15865:
URL: https://github.com/apache/kafka/pull/15865#issuecomment-2094813303

   Context - https://github.com/apache/kafka/pull/13434#discussion_r1590212654


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-05 Thread via GitHub


gaurav-narula commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1590315633


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   cleaners += cleaner
   cleaner.start()
 }
+activateMetrics();

Review Comment:
   Would be useful to add the test in the JIRA for posterity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-05 Thread via GitHub


gaurav-narula commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1590313031


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig,
 cleanerManager.removeMetrics()

Review Comment:
   I reckon the metrics in `LogCleanerManager` would remained removed? Perhaps 
they shouldn't be removed while reconfiguring?



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   cleaners += cleaner
   cleaner.start()
 }
+activateMetrics();

Review Comment:
   Would be useful to add the test in the original JIRA for posterity.



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig,
 cleanerManager.removeMetrics()
   }
 
+  /**
+   * Activate metrics
+   */
+  def activateMetrics():Unit = {
+metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,

Review Comment:
   Perhaps we can remove the declarations in the class field above around line 
130?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-05 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1590274476


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int,
 }
   }
 }
+object FetchSessionCache {
+  private[server] val metricsGroup = new 
KafkaMetricsGroup(classOf[FetchSessionCache])
+  private val counter = new AtomicLong(0)
+}
+
+class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) {
+  // Set up metrics.
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS,
 () => cacheShards.map(_.size).sum)
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED,
 () => cacheShards.map(_.totalPartitions).sum)
+
+  def getCacheShard(sessionId: Int): FetchSessionCacheShard = {
+val shard = sessionId / cacheShards.head.sessionIdRange
+cacheShards(shard)
+  }
+
+  // Returns the shard in round-robin
+  def getNextCacheShard: FetchSessionCacheShard = {
+val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt

Review Comment:
   I used `AtomicLong` to practically rule out an overflow but found 
`Utils.toPositive` which is used by `RoundRobinPartitioner` :) Updated to use 
an `AtomicInteger` and also added some test to ensure round-robin allocations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


johnnychhsu commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590273427


##
core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java:
##
@@ -42,4 +42,5 @@
 boolean autoStart() default true;
 // Set default server properties for all @ClusterTest(s)
 ClusterConfigProperty[] serverProperties() default {};
+ClusterConfigDisplayTag[] displayTags() default {};

Review Comment:
   do you mean put the tags in the ClusterTestDefaults?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


johnnychhsu commented on PR #15861:
URL: https://github.com/apache/kafka/pull/15861#issuecomment-2094744565

   updated and  `./gradlew clean core:test --tests ClusterTestExtensionsTest 
--tests ClusterConfigTest` passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Wall-Clock based Windowing / Suppression [kafka]

2024-05-05 Thread via GitHub


eichingertim opened a new pull request, #15864:
URL: https://github.com/apache/kafka/pull/15864

   The change adds the possibility to suppress events during windowing based on 
WALL_CLOCK_TIME instead of STREAM_CLOCK_TIME. 
   
   **Problem Statement**:
   In our use case we needed this functionality, as the amount of events varies 
very much. Sometimes we get so few events, that the event won't be emitted at 
all unless a new event comes in to advance STREAM_CLOCK_TIME.
   
   **Solution**:
   We implemented a new possibility to use WALL_CLOCK_TIME instead of 
STREAM_CLOCK_TIME for suppression during windowing. It is based on a scheduled 
operation and punctuates the clock on each interval.
   
   
   **Testing**:
   For testing we extended the 
`streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java`
 with a new scenario where the suppression is based on wall clock time.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590270528


##
core/src/test/java/kafka/test/annotation/ClusterConfigDisplayTag.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Target({ElementType.ANNOTATION_TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ClusterConfigDisplayTag {

Review Comment:
   see https://github.com/apache/kafka/pull/15861#discussion_r1590270405



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590270405


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -152,6 +154,9 @@ public MetadataVersion metadataVersion() {
 public Map> perBrokerOverrideProperties() {
 return perBrokerOverrideProperties;
 }
+public Map displayTags() {

Review Comment:
   > If they are the same concept, let me rename it and combine them together
   
   It seems to me that one "tags" method is enough. That method returns the 
"modified" tags ( raw + others)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


johnnychhsu commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590267358


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -152,6 +154,9 @@ public MetadataVersion metadataVersion() {
 public Map> perBrokerOverrideProperties() {
 return perBrokerOverrideProperties;
 }
+public Map displayTags() {

Review Comment:
   I thought that `nameTags` was for other purposes, thus separated them and 
use different name for the tags that I am adding. If they are the same concept, 
let me rename it and combine them together



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


johnnychhsu commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590267137


##
core/src/test/java/kafka/test/annotation/ClusterConfigDisplayTag.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Target({ElementType.ANNOTATION_TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ClusterConfigDisplayTag {

Review Comment:
   there is another `nameTags` already exists, I was not sure if that means 
something else. In order not to mess them together, I use DisplayName. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


johnnychhsu commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590266910


##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -44,4 +44,5 @@
 String listener() default "";
 MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0;
 ClusterConfigProperty[] serverProperties() default {};
+ClusterConfigDisplayTag[] displayTags() default {};

Review Comment:
   i was wondering if it is being used by anyone now, but just checked and 
found that this is only used in test. Let me remove that 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


chia7712 commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1590262208


##
core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java:
##
@@ -42,4 +42,5 @@
 boolean autoStart() default true;
 // Set default server properties for all @ClusterTest(s)
 ClusterConfigProperty[] serverProperties() default {};
+ClusterConfigDisplayTag[] displayTags() default {};

Review Comment:
   Could we add this to default annotation if we do have such use case? 



##
core/src/test/java/kafka/test/annotation/ClusterConfigDisplayTag.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Target({ElementType.ANNOTATION_TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ClusterConfigDisplayTag {

Review Comment:
   Maybe call it `Tag`? `ClusterConfigDisplayTag` is a bit verbose to me



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -152,6 +154,9 @@ public MetadataVersion metadataVersion() {
 public Map> perBrokerOverrideProperties() {
 return perBrokerOverrideProperties;
 }
+public Map displayTags() {

Review Comment:
   Please update `nameTags`



##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -44,4 +44,5 @@
 String listener() default "";
 MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0;
 ClusterConfigProperty[] serverProperties() default {};
+ClusterConfigDisplayTag[] displayTags() default {};

Review Comment:
   We can remove the `name` field now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]

2024-05-05 Thread via GitHub


TaiJuWu opened a new pull request, #15862:
URL: https://github.com/apache/kafka/pull/15862

   *More detailed description of your change,
   refactor processClusterTemplate and add unit test for empty config
   
   *Summary of testing strategy (including rationale)
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [WIP] Kafka-16668: improve cluster test [kafka]

2024-05-05 Thread via GitHub


johnnychhsu opened a new pull request, #15861:
URL: https://github.com/apache/kafka/pull/15861

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org