[PR] KAFKA-16348 [WIP]: add logs to observe [kafka]

2024-03-08 Thread via GitHub


johnnychhsu opened a new pull request, #15503:
URL: https://github.com/apache/kafka/pull/15503

   ## Context
   
TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
 is flaky, I would like to fix this.
   Jira ticket: [KAFKA-16348](https://issues.apache.org/jira/browse/KAFKA-16348)
   
   ## Solution
   Add some logs to observe first.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16346: Fix flaky MetricsTest.testMetrics [kafka]

2024-03-08 Thread via GitHub


FrankYang0529 opened a new pull request, #15502:
URL: https://github.com/apache/kafka/pull/15502

   The `MessageConversionsTimeMs` is calculated by `Math.round` before updating 
metrics, so it could be zero if it run too fast.
   
   Modify updating logic as only updating metrics if `messageConversionsTimeMs` 
is greater than 0.
   
   Modify unit test to check whether the metric is updated.
   
   I run the test 100 times and there is no flaky result.
   
   ```
   I=1; while [ $I -le 100 ] && ./gradlew cleanTest core:test --tests 
MetricsTest.testMetrics; do echo "Completed run: $I"; (( I=$I+1 )); sleep 1; 
done
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16146: Checkpoint log-start-offset for remote log enabled topics [kafka]

2024-03-08 Thread via GitHub


kamalcph commented on PR #15201:
URL: https://github.com/apache/kafka/pull/15201#issuecomment-1986704478

   rebased the patch with trunk. The failed tests are unrelated. 
   
   cc @junrao @satishd 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-08 Thread via GitHub


chia7712 commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1518449958


##
core/src/test/scala/integration/kafka/api/OffsetOfMaxTimestampTest.scala:
##
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.api
+
+import kafka.api.IntegrationTestHarness
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, NewTopic, OffsetSpec}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.{Collections, Properties}
+
+class OffsetOfMaxTimestampTest extends IntegrationTestHarness {
+  @AfterEach
+  override def tearDown(): Unit = {
+TestUtils.shutdownServers(brokers)
+super.tearDown()
+  }
+
+  override def brokerCount: Int = 1
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWithNoCompression(quorum: String): Unit = {
+test(false)
+  }
+
+  private def test(useCompression: Boolean): Unit = {
+val topicName: String = "OffsetOfMaxTimestampTest-" + 
System.currentTimeMillis()
+
+val admin: Admin = Admin.create(adminClientConfig)
+try {
+  admin.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 
1.toShort)
+.configs(Collections.singletonMap(TopicConfig.COMPRESSION_TYPE_CONFIG, 
if (useCompression) "gzip" else "none"
+  val props: Properties = new Properties
+  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
+  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) 
"gzip" else "none")
+  val producer: KafkaProducer[String, String] = new KafkaProducer[String, 
String](props)
+  try {
+val time: Long = 1
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
100, null, "val20"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
400, null, "val15"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
250, null, "val15"))

Review Comment:
   https://github.com/apache/kafka/pull/15474#discussion_r1518055609 can 
address that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]

2024-03-08 Thread via GitHub


OmniaGM opened a new pull request, #15501:
URL: https://github.com/apache/kafka/pull/15501

   Moving KafkaConfig props out of core. The pr is big however it's mostly 
renaming the props to match Java constant patterns and switching to 
`org.apache.kafka.server.config.KafkaConfig` instead of the object companion 
`kafka.server.KafkaConfig`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1518419803


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -417,8 +422,14 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 long lastOffset = offsetCounter.value - 1;
 firstBatch.setLastOffset(lastOffset);
 
-if (timestampType == TimestampType.LOG_APPEND_TIME)
+if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = lastOffset;

Review Comment:
   Hmm, we should be consistent among different magic values, right? It seems 
the semantic for MAX_TIMESTAMP is the first offset with the max timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


junrao commented on PR #15474:
URL: https://github.com/apache/kafka/pull/15474#issuecomment-1986585552

   Also, https://github.com/apache/kafka/pull/15476#discussion_r1518413083 
applies to here too. We just need to fix it in one of the PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-08 Thread via GitHub


junrao commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1518422852


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,11 +293,11 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
-}
-
-if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-offsetOfMaxTimestamp = offsetCounter.value - 1;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = offsetCounter.value - 1;

Review Comment:
   Hmm, we should be consistent among different magic values, right? It seems 
the semantic for MAX_TIMESTAMP is the first offset with the max timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-08 Thread via GitHub


junrao commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1518420555


##
core/src/test/scala/integration/kafka/api/OffsetOfMaxTimestampTest.scala:
##
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.api
+
+import kafka.api.IntegrationTestHarness
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, NewTopic, OffsetSpec}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.{Collections, Properties}
+
+class OffsetOfMaxTimestampTest extends IntegrationTestHarness {
+  @AfterEach
+  override def tearDown(): Unit = {
+TestUtils.shutdownServers(brokers)
+super.tearDown()
+  }
+
+  override def brokerCount: Int = 1
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWithNoCompression(quorum: String): Unit = {
+test(false)
+  }
+
+  private def test(useCompression: Boolean): Unit = {
+val topicName: String = "OffsetOfMaxTimestampTest-" + 
System.currentTimeMillis()
+
+val admin: Admin = Admin.create(adminClientConfig)
+try {
+  admin.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 
1.toShort)
+.configs(Collections.singletonMap(TopicConfig.COMPRESSION_TYPE_CONFIG, 
if (useCompression) "gzip" else "none"
+  val props: Properties = new Properties
+  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
+  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) 
"gzip" else "none")
+  val producer: KafkaProducer[String, String] = new KafkaProducer[String, 
String](props)
+  try {
+val time: Long = 1
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
100, null, "val20"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
400, null, "val15"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
250, null, "val15"))

Review Comment:
   How do we make sure that the 3 records are in the same batch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AddPartitionsToTxnManager performance optimizations [kafka]

2024-03-08 Thread via GitHub


jolshan commented on PR #15454:
URL: https://github.com/apache/kafka/pull/15454#issuecomment-1986575283

   Can you check `testTransactionCoordinatorResolution()`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AddPartitionsToTxnManager performance optimizations [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15454:
URL: https://github.com/apache/kafka/pull/15454#discussion_r1518416635


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##
@@ -98,19 +99,20 @@ class AddPartitionsToTxnManagerTest {
 when(partitionFor.apply(transactionalId1)).thenReturn(0)
 when(partitionFor.apply(transactionalId2)).thenReturn(1)
 when(partitionFor.apply(transactionalId3)).thenReturn(0)
-
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
-  .thenReturn(Seq(
-new MetadataResponseData.MetadataResponseTopic()
-  .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-  .setPartitions(List(
-new MetadataResponseData.MetadataResponsePartition()
-  .setPartitionIndex(0)
-  .setLeaderId(0),
-new MetadataResponseData.MetadataResponsePartition()
-  .setPartitionIndex(1)
-  .setLeaderId(1)
-  ).asJava)
-  ))
+when(metadataCache.getPartitionInfo(Topic.TRANSACTION_STATE_TOPIC_NAME, 0))

Review Comment:
   i wonder if we could make a helper method for all of these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-08 Thread via GitHub


junrao commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1518413083


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,11 +293,11 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
-}
-
-if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-offsetOfMaxTimestamp = offsetCounter.value - 1;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = offsetCounter.value - 1;
+} else {
+offsetOfMaxTimestamp = initialOffset;

Review Comment:
   We pass `offsetOfMaxTimestamp` to 
`ValidationResult.shallowOffsetOfMaxTimestampMs`. Using shallow offset used to 
be ok.  But when we added the `MAX_TIMESTAMP` support for ListOffset, it 
becomes important for offsetOfMaxTimestamp to be at the exact record level, 
instead of the shallow batch level. So, we need to rename 
`ValidationResult.shallowOffsetOfMaxTimestampMs` properly to make it clear. 
   
   Also, there are other places like `LogSegment.append()` where we use the 
name `shallowOffsetOfMaxTimestamp`. We will need to rename it too to be clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix incorrect syntax for config [kafka]

2024-03-08 Thread via GitHub


mjsax merged PR #15500:
URL: https://github.com/apache/kafka/pull/15500


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix incorrect syntax for config [kafka]

2024-03-08 Thread via GitHub


mjsax commented on code in PR #15500:
URL: https://github.com/apache/kafka/pull/15500#discussion_r1518395847


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -285,8 +285,10 @@ public class ProducerConfig extends AbstractConfig {
 "" +
 "If not set, the default partitioning logic is used. " + 
 "This strategy send records to a partition until at least " + 
BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the 
strategy:" + 
-" 1) If no partition is specified but a key is present, choose 
a partition based on a hash of the key." +
-" 2) If no partition or key is present, choose the sticky 
partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are 
produced to the partition." +
+"" +
+"If no partition is specified but a key is present, choose a 
partition based on a hash of the key." +

Review Comment:
   ```suggestion
   "If no partition is specified but a key is present, choose a 
partition based on a hash of the key." +
   ```



##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -285,8 +285,10 @@ public class ProducerConfig extends AbstractConfig {
 "" +
 "If not set, the default partitioning logic is used. " + 
 "This strategy send records to a partition until at least " + 
BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the 
strategy:" + 
-" 1) If no partition is specified but a key is present, choose 
a partition based on a hash of the key." +
-" 2) If no partition or key is present, choose the sticky 
partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are 
produced to the partition." +
+"" +
+"If no partition is specified but a key is present, choose a 
partition based on a hash of the key." +
+"If no partition or key is present, choose the sticky 
partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are 
produced to the partition." +

Review Comment:
   ```suggestion
   "If no partition or key is present, choose the sticky 
partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are 
produced to the partition." +
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-08 Thread via GitHub


nizhikov commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-1986494220

   @chia7712 PR merged with latest trunk and ready for review. Please, take a 
look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix incorrect syntax for config [kafka]

2024-03-08 Thread via GitHub


joel-hamill commented on PR #15500:
URL: https://github.com/apache/kafka/pull/15500#issuecomment-1986393715

   cc @cherylws 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Fix incorrect syntax for config [kafka]

2024-03-08 Thread via GitHub


joel-hamill opened a new pull request, #15500:
URL: https://github.com/apache/kafka/pull/15500

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   Fixed improper syntax which renders improperly in the downstream output.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16358) Update Connect Transformation documentation

2024-03-08 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino updated KAFKA-16358:
-
Description: 
When reading the [Kafka Connect 
docs|https://kafka.apache.org/documentation/#connect_included_transformation] 
for transformations, there are a few gaps that should be covered:
 * The Flatten, Cast and TimestampConverter transformations are not listed
 * HeadersFrom should be HeaderFrom
 * -InsertHeader is not documented-

  was:
When reading the [Kafka Connect 
docs|https://kafka.apache.org/documentation/#connect_included_transformation] 
for transformations, there are a few gaps that should be covered:
 * The Flatten, Cast and TimestampConverter transformations are not listed
 * HeadersFrom should be HeaderFrom
 * InsertHeader is not documented

Should be relatively easy to fix


> Update Connect Transformation documentation
> ---
>
> Key: KAFKA-16358
> URL: https://issues.apache.org/jira/browse/KAFKA-16358
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> When reading the [Kafka Connect 
> docs|https://kafka.apache.org/documentation/#connect_included_transformation] 
> for transformations, there are a few gaps that should be covered:
>  * The Flatten, Cast and TimestampConverter transformations are not listed
>  * HeadersFrom should be HeaderFrom
>  * -InsertHeader is not documented-



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fix flaky EosIntegrationTest [kafka]

2024-03-08 Thread via GitHub


mjsax commented on PR #15494:
URL: https://github.com/apache/kafka/pull/15494#issuecomment-1986339244

   Oh dear... I did commit some of these change on the wrong branch, and 
committed it with a totally unrelated PR: 
https://github.com/apache/kafka/commit/b9a5b4a8053c1fa65e27a9f93440194b0dd5eec4#diff-a17eb0da226d839a072c4b906a03afa3525d9a75a833a49331dc8fc31843d4ac
   
   Will rebase this one, but it will only contain a few more improvements... My 
bad...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16358 Update Connect Transformation docs [kafka]

2024-03-08 Thread via GitHub


hgeraldino opened a new pull request, #15499:
URL: https://github.com/apache/kafka/pull/15499

   Minor changes to the docs that:
   * Sort transformations by name
   * Adds missing `Cast`, `TimestampConverter` and `Flatten` transformations to 
the list
   * Adds anchor links for each transformation
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-08 Thread via GitHub


chia7712 commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-1986332968

   @nizhikov please rebase it. I feel we can complete this hard work this week 
:)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [4/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-08 Thread via GitHub


chia7712 commented on PR #15465:
URL: https://github.com/apache/kafka/pull/15465#issuecomment-1986332391

   @nizhikov nice patch. Let us go back to #14471 :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [4/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-08 Thread via GitHub


chia7712 merged PR #15465:
URL: https://github.com/apache/kafka/pull/15465


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16358) Update Connect Transformation documentation

2024-03-08 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino reassigned KAFKA-16358:


Assignee: Hector Geraldino

> Update Connect Transformation documentation
> ---
>
> Key: KAFKA-16358
> URL: https://issues.apache.org/jira/browse/KAFKA-16358
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> When reading the [Kafka Connect 
> docs|https://kafka.apache.org/documentation/#connect_included_transformation] 
> for transformations, there are a few gaps that should be covered:
>  * The Flatten, Cast and TimestampConverter transformations are not listed
>  * HeadersFrom should be HeaderFrom
>  * InsertHeader is not documented
> Should be relatively easy to fix



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16358) Update Connect Transformation documentation

2024-03-08 Thread Hector Geraldino (Jira)
Hector Geraldino created KAFKA-16358:


 Summary: Update Connect Transformation documentation
 Key: KAFKA-16358
 URL: https://issues.apache.org/jira/browse/KAFKA-16358
 Project: Kafka
  Issue Type: Bug
  Components: connect
Reporter: Hector Geraldino


When reading the [Kafka Connect 
docs|https://kafka.apache.org/documentation/#connect_included_transformation] 
for transformations, there are a few gaps that should be covered:
 * The Flatten, Cast and TimestampConverter transformations are not listed
 * HeadersFrom should be HeaderFrom
 * InsertHeader is not documented

Should be relatively easy to fix



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10892: Shared Readonly State Stores ( revisited ) [kafka]

2024-03-08 Thread via GitHub


mjsax commented on PR #12742:
URL: https://github.com/apache/kafka/pull/12742#issuecomment-1986241410

   Merged to `trunk`. Thanks for the KIP and PR @calmera!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10892: Shared Readonly State Stores ( revisited ) [kafka]

2024-03-08 Thread via GitHub


mjsax merged PR #12742:
URL: https://github.com/apache/kafka/pull/12742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on PR #15486:
URL: https://github.com/apache/kafka/pull/15486#issuecomment-1986154766

   I took a quick skim of the tests -- just wanted to confirm we have a test 
for the old produce request + invalid txn state returned?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1518065460


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -49,7 +49,8 @@ object AddPartitionsToTxnManager {
  */
 class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
   val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback],
-  val startTimeMs: mutable.Map[String, Long])
+  val startTimeMs: mutable.Map[String, Long],
+  val produceRequestVersion: Short)

Review Comment:
   nice this is what I was thinking  
   I noticed that this is a bit tricky for the offset commit path. We will want 
a given request version there too, but the versions don't map quite the same. I 
will think a bit on how to address this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1518067902


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1007,7 +1010,8 @@ class ReplicaManager(val config: KafkaConfig,
   transactionalId,
   producerId,
   producerEpoch,
-  generalizedCallback
+  generalizedCallback,
+  ApiKeys.PRODUCE.latestVersion

Review Comment:
   Hmm this is a bit interesting since this path is usually exercised by the 
group coordinator. We may want to do something else, or leave a comment here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1518065927


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   > So while it is ok to configure via the flush method -- let's make the 
record size smaller. We should confirm the batching is as expected when we run 
the tests. :)
   
   +1 to this comment  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1518065460


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -49,7 +49,8 @@ object AddPartitionsToTxnManager {
  */
 class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
   val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback],
-  val startTimeMs: mutable.Map[String, Long])
+  val startTimeMs: mutable.Map[String, Long],
+  val produceRequestVersion: Short)

Review Comment:
   nice this is what I was thinking  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1518064096


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -228,11 +232,17 @@ class AddPartitionsToTxnManager(
 val tp = new TopicPartition(topicResult.name, 
partitionResult.partitionIndex)
 if (partitionResult.partitionErrorCode != Errors.NONE.code) {
   // Producers expect to handle INVALID_PRODUCER_EPOCH in this 
scenario.
-  val code =
+  var code =
 if (partitionResult.partitionErrorCode == 
Errors.PRODUCER_FENCED.code)
   Errors.INVALID_PRODUCER_EPOCH.code
 else
   partitionResult.partitionErrorCode
+  // For backward compatibility with clients.

Review Comment:
   nice 
I think we can include this in val code as a subcase so we don't need the 
var



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1518064096


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -228,11 +232,17 @@ class AddPartitionsToTxnManager(
 val tp = new TopicPartition(topicResult.name, 
partitionResult.partitionIndex)
 if (partitionResult.partitionErrorCode != Errors.NONE.code) {
   // Producers expect to handle INVALID_PRODUCER_EPOCH in this 
scenario.
-  val code =
+  var code =
 if (partitionResult.partitionErrorCode == 
Errors.PRODUCER_FENCED.code)
   Errors.INVALID_PRODUCER_EPOCH.code
 else
   partitionResult.partitionErrorCode
+  // For backward compatibility with clients.

Review Comment:
   nice  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1518063503


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3151,6 +3152,45 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
 txnManager.beginTransaction();
 }
+
+@Test
+public void testAbortableTxnExceptionIsAnAbortableError() throws Exception 
{
+ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+TransactionManager txnManager = new TransactionManager(logContext, 
"textAbortableTxnException", 6, 100, apiVersions);
+
+setupWithTransactionState(txnManager);
+doInitTransactions(txnManager, producerIdAndEpoch);
+
+txnManager.beginTransaction();
+txnManager.maybeAddPartition(tp0);
+client.prepareResponse(buildAddPartitionsToTxnResponseData(0, 
Collections.singletonMap(tp0, Errors.NONE)));
+sender.runOnce();
+
+Future request = appendToAccumulator(tp0);
+sender.runOnce();  // send request
+sendIdempotentProducerResponse(0, tp0, 
Errors.ABORTABLE_TRANSACTION_EXCEPTION, -1);
+
+// Return InvalidTxnState error. It should be abortable.

Review Comment:
   Is this comment correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1518062498


##
clients/src/main/resources/common/message/ProduceRequest.json:
##
@@ -35,7 +35,10 @@
   // Version 9 enables flexible versions.
   //
   // Version 10 is the same as version 9 (KIP-951).
-  "validVersions": "0-10",
+  //
+  // Version 11 is schematically same as Version 10. It will only enable the 
server to know that

Review Comment:
   For now, let's just say Version 11 is the same as 10. We may decide to do a 
separate bump for the implicit add partitions to transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1518061441


##
clients/src/main/java/org/apache/kafka/common/errors/AbortableTransactionException.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class AbortableTransactionException extends ApiException {

Review Comment:
   I know there were some questions about the naming -- this seems reasonable 
but just wanted to call it out.
   
   cc: @artemlivshits 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1518058151


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   I don't think there is a need to have 10KB records regardless -- we can't 
have this size in a single batch consistently.
   
   So while it is ok to configure via the flush method -- let's make the record 
size smaller. We should confirm the batching is as expected when we run the 
tests. :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


ijuma commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1518055609


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   `producer.flush` is fine if we set large `batch.size` and `linger.ms` so we 
can control when it happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15649) Handle directory failure timeout

2024-03-08 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824838#comment-17824838
 ] 

Igor Soarez commented on KAFKA-15649:
-

That would be great, please do [~viktorsomogyi]  

> Handle directory failure timeout 
> -
>
> Key: KAFKA-15649
> URL: https://issues.apache.org/jira/browse/KAFKA-15649
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Priority: Minor
>
> If a broker with an offline log directory continues to fail to notify the 
> controller of either:
>  * the fact that the directory is offline; or
>  * of any replica assignment into a failed directory
> then the controller will not check if a leadership change is required, and 
> this may lead to partitions remaining indefinitely offline.
> KIP-858 proposes that the broker should shut down after a configurable 
> timeout to force a leadership change. Alternatively, the broker could also 
> request to be fenced, as long as there's a path for it to later become 
> unfenced.
> While this unavailability is possible in theory, in practice it's not easy to 
> entertain a scenario where a broker continues to appear as healthy before the 
> controller, but fails to send this information. So it's not clear if this is 
> a real problem. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting

2024-03-08 Thread Jacek Wojciechowski (Jira)
Jacek Wojciechowski created KAFKA-16357:
---

 Summary: Kafka Client JAR manifest breaks javac linting
 Key: KAFKA-16357
 URL: https://issues.apache.org/jira/browse/KAFKA-16357
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.7.0
 Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy)
Reporter: Jacek Wojciechowski


I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project is 
not building anymore.

The reason is that kafka-clients-3.7.0.jar contains the following entry in its 
JAR manifest file:

Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
 .5.jar slf4j-api-1.7.36.jar

I'm using Maven repo to keep my dependencies and those files are not in the 
same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's 
Class-Path are not correct. It fails my build because we build with javac with 
all linting options on, in particular -Xlint:-path. It produces the following 
warnings coming from javac:
[WARNING] COMPILATION WARNING : 
[INFO] -
[WARNING] [path] bad path element 
"/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
 no such file or directory
[WARNING] [path] bad path element 
"/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar":
 no such file or directory
[WARNING] [path] bad path element 
"/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
 no such file or directory
[WARNING] [path] bad path element 
"/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
 no such file or directory
Since we have also {{-Werror}} option enabled, it turns warnings into errors 
and fails our build.

I think our setup is quite typical: using Maven repo to store dependencies, 
having linting on and -Werror. Unfortunatelly, it doesn't work with the lastest 
kafka-clients because of the entries in the manifest's Class-Path. And I think 
it might affect quite a lot of projects set up in a similar way.

I don't know what was the reason to add Class-Path entry in the JAR manifest 
file - but perhaps this effect was not considered.

It would be great if you removed the Class-Path entry from the JAR manifest 
file.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde

2024-03-08 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16356:
---

 Summary: Remove class-name dispatch in RemoteLogMetadataSerde
 Key: KAFKA-16356
 URL: https://issues.apache.org/jira/browse/KAFKA-16356
 Project: Kafka
  Issue Type: Task
  Components: Tiered-Storage
Affects Versions: 3.7.0
Reporter: Greg Harris


The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and 
has to dispatch to one of four serializers depending on it's type. This is done 
by taking the class name of the RemoteLogMetadata and looking it up in maps to 
find the corresponding serializer for that class.

This later requires an unchecked cast, because the RemoteLogMetadataTransform 
is generic. This is all type-unsafe, and can be replaced with type-safe 
if-elseif-else statements that may also be faster than the double-indirect map 
lookups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Change config. compressionType to be an enum type instead [kafka]

2024-03-08 Thread via GitHub


divijvaidya commented on PR #15403:
URL: https://github.com/apache/kafka/pull/15403#issuecomment-1986041001

   Hey @testn 
   Tests in the CI seem to be failing due to this change such as 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15403/6/testReport/junit/kafka.server/DynamicConfigChangeTest/Build___JDK_11_and_Scala_2_13___testMessageFormatVersionChange_String__quorum_zk/
 
   
   You can verify them by running `./gradlew test` locally. Please check and 
fix them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Reduce memory allocation in ClientTelemetryReporter.java [kafka]

2024-03-08 Thread via GitHub


divijvaidya commented on PR #15402:
URL: https://github.com/apache/kafka/pull/15402#issuecomment-1986036580

   @testn congratulations on your first contribution to Apache Kafka! Hopefully 
first of many more to come.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Reduce memory allocation in ClientTelemetryReporter.java [kafka]

2024-03-08 Thread via GitHub


divijvaidya commented on PR #15402:
URL: https://github.com/apache/kafka/pull/15402#issuecomment-1986032863

   The test failures are unrelated to this change.
   
   @testn You can see the test failures in the orange build by clicking on the 
arrow on top right. We have a flaky test suite but if a test is failing 
consistently, it usually is caused by the change in the PR. In this case, 
things look ok.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Reduce memory allocation in ClientTelemetryReporter.java [kafka]

2024-03-08 Thread via GitHub


divijvaidya merged PR #15402:
URL: https://github.com/apache/kafka/pull/15402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1517951926


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   > We should probably have both cases.
   
   There are two cases we want to test:
   
   1. three records are in single request
   2. three records are sent by different request
   
   Hence, we should use `producer#flush` instead of setting different record 
size. The former certainly address both cases. By contrast, we can't ensure the 
"larger" record size can result in multi requests since it depends heavily on 
the buffer size used by producer. For example, the test will get pointless if 
we increase the default buffer size in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16126) Kcontroller dynamic configurations may fail to apply at startup

2024-03-08 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16126:
-
Component/s: kraft

> Kcontroller dynamic configurations may fail to apply at startup
> ---
>
> Key: KAFKA-16126
> URL: https://issues.apache.org/jira/browse/KAFKA-16126
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.7.0, 3.6.2
>
>
> Some kcontroller dynamic configurations may fail to apply at startup. This 
> happens because there is a race between registering the reconfigurables to 
> the DynamicBrokerConfig class, and receiving the first update from the 
> metadata publisher. We can fix this by registering the reconfigurables first. 
> This seems to have been introduced by the "MINOR: Install ControllerServer 
> metadata publishers sooner" change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16126) Kcontroller dynamic configurations may fail to apply at startup

2024-03-08 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-16126.
--
Resolution: Fixed

> Kcontroller dynamic configurations may fail to apply at startup
> ---
>
> Key: KAFKA-16126
> URL: https://issues.apache.org/jira/browse/KAFKA-16126
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.6.2, 3.7.0
>
>
> Some kcontroller dynamic configurations may fail to apply at startup. This 
> happens because there is a race between registering the reconfigurables to 
> the DynamicBrokerConfig class, and receiving the first update from the 
> metadata publisher. We can fix this by registering the reconfigurables first. 
> This seems to have been introduced by the "MINOR: Install ControllerServer 
> metadata publishers sooner" change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16126) Kcontroller dynamic configurations may fail to apply at startup

2024-03-08 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16126:
-
Fix Version/s: (was: 3.8.0)

> Kcontroller dynamic configurations may fail to apply at startup
> ---
>
> Key: KAFKA-16126
> URL: https://issues.apache.org/jira/browse/KAFKA-16126
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.7.0, 3.6.2
>
>
> Some kcontroller dynamic configurations may fail to apply at startup. This 
> happens because there is a race between registering the reconfigurables to 
> the DynamicBrokerConfig class, and receiving the first update from the 
> metadata publisher. We can fix this by registering the reconfigurables first. 
> This seems to have been introduced by the "MINOR: Install ControllerServer 
> metadata publishers sooner" change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16126: Kcontroller dynamic configurations may fail to apply at startup [kafka]

2024-03-08 Thread via GitHub


divijvaidya commented on PR #15192:
URL: https://github.com/apache/kafka/pull/15192#issuecomment-1985996064

   For completenss, the associated commit was merged to 3.6, 3.7 and trunk
   
   3.6 
-https://github.com/apache/kafka/commit/b743f6fd884132c7a5c4e9d96ed62e3aec29007f
  
   3.7 - 
https://github.com/apache/kafka/commit/b40368330814888d7f7f2fda3f5b7ecfa1eabeb2 
   trunk - 
https://github.com/apache/kafka/commit/0015d0f01b130992acc37da85da6ee2088186a1f 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16126) Kcontroller dynamic configurations may fail to apply at startup

2024-03-08 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824828#comment-17824828
 ] 

Divij Vaidya commented on KAFKA-16126:
--

Seems like the associated commit in the PR was merged to 3.6, 3.7 and trunk.

3.6 
-[https://github.com/apache/kafka/commit/b743f6fd884132c7a5c4e9d96ed62e3aec29007f]
  
3.7 - 
[https://github.com/apache/kafka/commit/b40368330814888d7f7f2fda3f5b7ecfa1eabeb2]
 
trunk - 
[https://github.com/apache/kafka/commit/0015d0f01b130992acc37da85da6ee2088186a1f]
 

I am correcting the fix version here and closing this ticket.

> Kcontroller dynamic configurations may fail to apply at startup
> ---
>
> Key: KAFKA-16126
> URL: https://issues.apache.org/jira/browse/KAFKA-16126
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
>
> Some kcontroller dynamic configurations may fail to apply at startup. This 
> happens because there is a race between registering the reconfigurables to 
> the DynamicBrokerConfig class, and receiving the first update from the 
> metadata publisher. We can fix this by registering the reconfigurables first. 
> This seems to have been introduced by the "MINOR: Install ControllerServer 
> metadata publishers sooner" change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16126) Kcontroller dynamic configurations may fail to apply at startup

2024-03-08 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16126:
-
Fix Version/s: 3.6.2
   3.8.0
   3.7.0

> Kcontroller dynamic configurations may fail to apply at startup
> ---
>
> Key: KAFKA-16126
> URL: https://issues.apache.org/jira/browse/KAFKA-16126
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.7.0, 3.6.2, 3.8.0
>
>
> Some kcontroller dynamic configurations may fail to apply at startup. This 
> happens because there is a race between registering the reconfigurables to 
> the DynamicBrokerConfig class, and receiving the first update from the 
> metadata publisher. We can fix this by registering the reconfigurables first. 
> This seems to have been introduced by the "MINOR: Install ControllerServer 
> metadata publishers sooner" change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: remove the copy constructor of LogSegment [kafka]

2024-03-08 Thread via GitHub


johnnychhsu commented on code in PR #15488:
URL: https://github.com/apache/kafka/pull/15488#discussion_r1517940750


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -352,7 +353,10 @@ class LogLoaderTest {
   // Intercept all segment read calls
   val interceptedLogSegments = new LogSegments(topicPartition) {
 override def add(segment: LogSegment): LogSegment = {
-  val wrapper = new LogSegment(segment) {
+  val idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(wrapperLogDir, 
segment.baseOffset()), segment.baseOffset(), 1000)

Review Comment:
   learnt a new framework for testing, thanks for sharing this! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16126: Kcontroller dynamic configurations may fail to apply at startup [kafka]

2024-03-08 Thread via GitHub


divijvaidya commented on PR #15192:
URL: https://github.com/apache/kafka/pull/15192#issuecomment-1985979960

   @cmccabe the test `testStartupWithNonDefaultKControllerDynamicConfiguration` 
has been failing since it has been introduced to 3.6 branch. See: 
https://ge.apache.org/scans/tests?search.relativeStartTime=P90D=kafka=3.6=Europe%2FBerlin=kafka.server.KRaftClusterTest=testStartupWithNonDefaultKControllerDynamicConfiguration()
 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15490: Fix dir path when marking offline [kafka]

2024-03-08 Thread via GitHub


divijvaidya commented on PR #15490:
URL: https://github.com/apache/kafka/pull/15490#issuecomment-1985969718

   @showuon can you please take a look at this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15490: Fix dir path when marking offline [kafka]

2024-03-08 Thread via GitHub


divijvaidya commented on PR #15490:
URL: https://github.com/apache/kafka/pull/15490#issuecomment-1985969187

   The failing tests are known to be flaky for 3.6 branch, e,g, 
https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=3.6=Europe%2FBerlin=kafka.server.KRaftClusterTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove the copy constructor of LogSegment [kafka]

2024-03-08 Thread via GitHub


johnnychhsu commented on code in PR #15488:
URL: https://github.com/apache/kafka/pull/15488#discussion_r1517906750


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -352,7 +353,10 @@ class LogLoaderTest {
   // Intercept all segment read calls
   val interceptedLogSegments = new LogSegments(topicPartition) {
 override def add(segment: LogSegment): LogSegment = {
-  val wrapper = new LogSegment(segment) {
+  val idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(wrapperLogDir, 
segment.baseOffset()), segment.baseOffset(), 1000)

Review Comment:
   thanks for the suggestion! let me check that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16318 [WIP]: add javafoc for kafka metric [kafka]

2024-03-08 Thread via GitHub


johnnychhsu commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1517905708


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -20,6 +20,37 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.utils.Time;
 
+/**
+ * An implementation of {@link Metric} interface.
+ * 
+ * A KafkaMetric is a named metric for monitoring purpose. The metric value 
can be a {@link Measurable} or a {@link Gauge}.
+ * 
+ * metricName The name of the metric
+ * lock A lock used for reading the metric value in case of race 
condition
+ * time The POSIX time in milliseconds the metric is being taken
+ * metricValueProvider The metric collecting implementation that 
implements {@link MetricValueProvider}
+ * config The metric configuration which is a {@link MetricConfig}
+ * 
+ * 
+ * Usage looks something like this:
+ *
+ * {@code
+ * // set up metrics:
+ *
+ * Map tags = new HashMap<>();
+ * tags.put("key1", "value1");
+ *
+ * MetricConfig config = new MetricConfig().tags(metricTags);
+ * Time time = new SystemTime();
+ * metricName = new MetricName(message-size-max, 
producer-metrics);
+ *
+ * KafkaMetric m = new KafkaMetric(new Object(),

Review Comment:
   thanks for the comment! yes that makes sense to me, I think I missed the 
point.
   Just updated and added doc for public methods, thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Bump 3.7 branch's version to 3.7.1-SNAPSHOT [kafka]

2024-03-08 Thread via GitHub


chia7712 merged PR #15431:
URL: https://github.com/apache/kafka/pull/15431


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Bump 3.7 branch's version to 3.7.1-SNAPSHOT [kafka]

2024-03-08 Thread via GitHub


chia7712 commented on PR #15431:
URL: https://github.com/apache/kafka/pull/15431#issuecomment-1985910550

   > Is there a reason why we've not merged this yet?
   
   This PR is ready to be merged I think. 
   
   I will merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15649) Handle directory failure timeout

2024-03-08 Thread Viktor Somogyi-Vass (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824815#comment-17824815
 ] 

Viktor Somogyi-Vass commented on KAFKA-15649:
-

[~soarez] would you mind if I picked this up?

> Handle directory failure timeout 
> -
>
> Key: KAFKA-15649
> URL: https://issues.apache.org/jira/browse/KAFKA-15649
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Priority: Minor
>
> If a broker with an offline log directory continues to fail to notify the 
> controller of either:
>  * the fact that the directory is offline; or
>  * of any replica assignment into a failed directory
> then the controller will not check if a leadership change is required, and 
> this may lead to partitions remaining indefinitely offline.
> KIP-858 proposes that the broker should shut down after a configurable 
> timeout to force a leadership change. Alternatively, the broker could also 
> request to be fenced, as long as there's a path for it to later become 
> unfenced.
> While this unavailability is possible in theory, in practice it's not easy to 
> entertain a scenario where a broker continues to appear as healthy before the 
> controller, but fails to send this information. So it's not clear if this is 
> a real problem. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16355) ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

2024-03-08 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-16355:
-

Assignee: PoAn Yang

> ConcurrentModificationException in 
> InMemoryTimeOrderedKeyValueBuffer.evictWhile
> ---
>
> Key: KAFKA-16355
> URL: https://issues.apache.org/jira/browse/KAFKA-16355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Mickael Maison
>Assignee: PoAn Yang
>Priority: Major
>
> While a Streams application was restoring its state after an outage, it hit 
> the following:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_16, processor=KSTREAM-SOURCE-00, topic=, 
> partition=16, offset=454875695, 
> stacktrace=java.util.ConcurrentModificationException
> at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
> at 
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> 

Re: [PR] MINOR: Bump 3.7 branch's version to 3.7.1-SNAPSHOT [kafka]

2024-03-08 Thread via GitHub


mimaison commented on PR #15431:
URL: https://github.com/apache/kafka/pull/15431#issuecomment-1985770789

   Is there a reason why we've not merged this yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-03-08 Thread via GitHub


OmniaGM commented on PR #15335:
URL: https://github.com/apache/kafka/pull/15335#issuecomment-1985757706

   @soarez, @pprovenzano and @showuon can you please have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-03-08 Thread via GitHub


OmniaGM commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1517756842


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -289,13 +289,10 @@ class BrokerMetadataPublisher(
 try {
   // Start log manager, which will perform (potentially lengthy)
   // recovery-from-unclean-shutdown if required.
-  logManager.startup(metadataCache.getAllTopics())
-
-  // Delete partition directories which we're not supposed to have. We have
-  // to do this before starting ReplicaManager, so that the stray replicas
-  // don't block creation of new ones with different IDs but the same 
names.
-  // See KAFKA-14616 for details.
-  logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics())
+  logManager.startup(
+metadataCache.getAllTopics(),
+shouldBeStrayKraftLog = log => 
LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)

Review Comment:
   I updated this with the suggested comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-03-08 Thread via GitHub


OmniaGM commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1517754815


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -354,6 +355,14 @@ class LogManager(logDirs: Seq[File],
 } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
   addStrayLog(topicPartition, log)
   warn(s"Loaded stray log: $logDir")
+} else if (shouldBeStrayKraftLog(log)) {
+  // Mark the partition directories we're not supposed to have as stray. 
We have to do this
+  // during log load because topics may have been recreated with the same 
name while a disk
+  // was offline.
+  // See KAFKA-16234, KAFKA-16157 and KAFKA-14616 for details.

Review Comment:
   Updated this comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16355) ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

2024-03-08 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824797#comment-17824797
 ] 

Mickael Maison commented on KAFKA-16355:


Sure

> ConcurrentModificationException in 
> InMemoryTimeOrderedKeyValueBuffer.evictWhile
> ---
>
> Key: KAFKA-16355
> URL: https://issues.apache.org/jira/browse/KAFKA-16355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Mickael Maison
>Priority: Major
>
> While a Streams application was restoring its state after an outage, it hit 
> the following:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_16, processor=KSTREAM-SOURCE-00, topic=, 
> partition=16, offset=454875695, 
> stacktrace=java.util.ConcurrentModificationException
> at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
> at 
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> 

[jira] [Commented] (KAFKA-16355) ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

2024-03-08 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824796#comment-17824796
 ] 

PoAn Yang commented on KAFKA-16355:
---

Hi [~mimaison], I am interested in this issue. May I take it? Thank you.

> ConcurrentModificationException in 
> InMemoryTimeOrderedKeyValueBuffer.evictWhile
> ---
>
> Key: KAFKA-16355
> URL: https://issues.apache.org/jira/browse/KAFKA-16355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Mickael Maison
>Priority: Major
>
> While a Streams application was restoring its state after an outage, it hit 
> the following:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_16, processor=KSTREAM-SOURCE-00, topic=, 
> partition=16, offset=454875695, 
> stacktrace=java.util.ConcurrentModificationException
> at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
> at 
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> 

Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-03-08 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1517714052


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -84,6 +85,9 @@ private enum SubscriptionType {
 /* the pattern user has requested */
 private Pattern subscribedPattern;
 
+/* RE2J compatible regex */
+private SubscriptionPattern subscriptionPattern;

Review Comment:
   Nvm, I think adding this class would be a bit much when it only encapsulate 
the logic to check whether patter or subscriptionPattern is set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16355) ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

2024-03-08 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16355:
--

 Summary: ConcurrentModificationException in 
InMemoryTimeOrderedKeyValueBuffer.evictWhile
 Key: KAFKA-16355
 URL: https://issues.apache.org/jira/browse/KAFKA-16355
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.1
Reporter: Mickael Maison


While a Streams application was restoring its state after an outage, it hit the 
following:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_16, processor=KSTREAM-SOURCE-00, topic=, partition=16, 
offset=454875695, stacktrace=java.util.ConcurrentModificationException
at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
at 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
at 
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
at 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
at 
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 

Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-08 Thread via GitHub


dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1517690579


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9607,6 +9607,151 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testMaybeUpgradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+// A consumer group can't be upgraded.
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A non-empty classic group can't be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty classic group can be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
 records);
+}
+
+@Test
+public void testMaybeDowngradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A classic group can't be downgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty consumer group can be upgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Arrays.asList(
+
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), 
records);
+records.clear();
+
+// A non-empty consumer group can't be downgraded.
+ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder.build()));
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), 

Re: [PR] KAFKA-16226; Reduce synchronization between producer threads (#15323) [kafka]

2024-03-08 Thread via GitHub


msn-tldr commented on PR #15498:
URL: https://github.com/apache/kafka/pull/15498#issuecomment-1985641282

   @hachikuji / @ijuma can you help in merging this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16226; Reduce synchronization between producer threads (#15323) [kafka]

2024-03-08 Thread via GitHub


msn-tldr opened a new pull request, #15498:
URL: https://github.com/apache/kafka/pull/15498

   NOTE this cherry-picks fix https://github.com/apache/kafka/pull/14522 into 
3.6
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226; Reduce synchronization between producer threads (#15323) [kafka]

2024-03-08 Thread via GitHub


msn-tldr commented on PR #15493:
URL: https://github.com/apache/kafka/pull/15493#issuecomment-1985621897

   @hachikuji / @ijuma would you be able to merge this?
   
   The test failures on jenkins are flaky and unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-03-08 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1517646778


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -494,6 +501,14 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol 
groupProtocol) {
 () -> consumer.subscribe(Pattern.compile("")));
 }
 
+@ParameterizedTest
+@EnumSource(value = GroupProtocol.class, names = "CONSUMER")

Review Comment:
   I got it wrong here. The subscriptionPattern is not supported in the classic 
protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-03-08 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1517636440


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -84,6 +85,9 @@ private enum SubscriptionType {
 /* the pattern user has requested */
 private Pattern subscribedPattern;
 
+/* RE2J compatible regex */
+private SubscriptionPattern subscriptionPattern;

Review Comment:
   On second thought, we should adopt this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [4/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-08 Thread via GitHub


nizhikov commented on PR #15465:
URL: https://github.com/apache/kafka/pull/15465#issuecomment-1985445590

   @chia7712 I've checked CI results - looks like failures unrelated to PR 
changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 3 [kafka]

2024-03-08 Thread via GitHub


clolov commented on PR #15497:
URL: https://github.com/apache/kafka/pull/15497#issuecomment-1985421216

   Heya @cadonna! This should be the quick follow-up of 
https://github.com/apache/kafka/pull/15261 to fully move the consumer mock  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add fast_commit to ext4 tuning docs [kafka]

2024-03-08 Thread via GitHub


moscicky opened a new pull request, #15496:
URL: https://github.com/apache/kafka/pull/15496

   This PR adds `fast_commit` tuning option to EXT4 notes. We have found this 
options be the most performant for EXT4 filesystem. The details of our analysis 
and findings are described in a 
[blogpost](https://blog.allegro.tech/2024/03/kafka-performance-analysis.html).
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org