[GitHub] [kafka] vamossagar12 commented on pull request #13453: KAFKA-12525: Ignoring Stale status statuses when reading from Status …

2023-06-13 Thread via GitHub


vamossagar12 commented on PR #13453:
URL: https://github.com/apache/kafka/pull/13453#issuecomment-159051

   @enzo-cappa , thanks.. yeah waiting for a round of review on this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sagarrao12 commented on pull request #13453: KAFKA-12525: Ignoring Stale status statuses when reading from Status …

2023-06-13 Thread via GitHub


sagarrao12 commented on PR #13453:
URL: https://github.com/apache/kafka/pull/13453#issuecomment-1590511123

   @enzo-cappa , thanks.. yeah waiting for a round of review on this one...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-13 Thread via GitHub


satishd commented on PR #13828:
URL: https://github.com/apache/kafka/pull/13828#issuecomment-1590492505

   > I think we don't have this implemented. We should pass 
remote.log.metadata.* into RLMM based on KIP-405. Created 
[KAFKA-15083](https://issues.apache.org/jira/browse/KAFKA-15083) for this issue.
   
   @showuon This is no more valid, KIP needs to be updated with the prefix 
based configs for RSM and RLMM. Will update the KIP with those details. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13845: KAFKA-15078; KRaft leader replys with snapshot for offset 0

2023-06-13 Thread via GitHub


dengziming commented on code in PR #13845:
URL: https://github.com/apache/kafka/pull/13845#discussion_r1228931662


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1017,7 +1017,16 @@ private FetchResponseData tryCompleteFetchRequest(
 long fetchOffset = request.fetchOffset();
 int lastFetchedEpoch = request.lastFetchedEpoch();
 LeaderState state = quorum.leaderStateOrThrow();
-ValidOffsetAndEpoch validOffsetAndEpoch = 
log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+Optional latestSnapshotId = log.latestSnapshotId();
+final ValidOffsetAndEpoch validOffsetAndEpoch;
+if (fetchOffset == 0 && latestSnapshotId.isPresent()) {

Review Comment:
   I have tried to figure out some other cases we prefer to read the snapshot, 
a common case is `highWatermark` - `fetchOffset` > count(snapshot record). Do 
you think it's possible to write count(snapshot record) in the snapshot header?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky opened a new pull request, #13851: check enable rack

2023-06-13 Thread via GitHub


lihaosky opened a new pull request, #13851:
URL: https://github.com/apache/kafka/pull/13851

   ## Description
   Initial implementation to check if rack aware assignment can be enabled
   
   ## Test
   TODO
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-13 Thread via GitHub


jolshan commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1590237911

   
unit.kafka.server.AddPartitionsToTxnManagerTest.testAddPartitionsToTxnManagerMetrics()
 is failing. I was a bit worried this could be flaky. Will investigate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-13 Thread via GitHub


jolshan commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1590186665

   With respect to the retries. There is currently a known error where if we 
produce using an idempotent producer we can get stuck in a retry loop.
   
   1. Request A with sequence 0 fails with a retriable error
   2. Request B with sequence 1 succeeds. Because we currently don't assert the 
sequence is 0 on the first records written (or any records written when we 
don't yet have producer state, this will succeed)
   3. Request A will be retried and will always fail with a retriable 
OutOfOrderSequence error.
   
   Imagine at step 2, we have a request C that fails with a non-retriable 
error. Will we get stuck waiting for Request A until the retries run out? The 
default is pretty high so this will go on for some time.
   
   BTW this issue is https://issues.apache.org/jira/browse/KAFKA-14359, and we 
are still considering the best way to fix it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-13 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1228732870


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,16 +240,17 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val metricNames =
 if (header.apiKey == ApiKeys.FETCH) {
-  val isFromFollower = body[FetchRequest].isFromFollower
-  Seq(
-if (isFromFollower) RequestMetrics.followFetchMetricName
+  val specifiedMetricName =
+if (body[FetchRequest].isFromFollower) 
RequestMetrics.followFetchMetricName
 else RequestMetrics.consumerFetchMetricName
-  )
+  Seq(specifiedMetricName, header.apiKey.name)
+} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && 
body[AddPartitionsToTxnRequest].verifyOnlyRequest()) {
+Seq(RequestMetrics.verifyPartitionsInTxnMetricName)
+} else {
+  Seq(header.apiKey.name)
 }
-else Seq.empty
-  val metricNames = fetchMetricNames :+ header.apiKey.name

Review Comment:
   Note: Like fetch metrics, both verification and nonverification 
addPartitionToTxn requests will count towards the rate for the request type. 
This is because the metric is keyed on the apiKey, and I can't distinguish the 
two. See below:
   ```
m.requestRate(header.apiVersion).mark()
```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-13 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1228729149


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3405,59 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testBackgroundInvalidStateTransitionIsFatal() {
+doInitTransactions();
+assertTrue(transactionManager.isTransactional());
+
+transactionManager.setPoisonStateOnInvalidTransition(true);

Review Comment:
   This works because we create a new transactionManager object per run of the 
test (ie, the thread local variable isn't reused between tests)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-13 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1228731391


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -85,12 +93,14 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
 topicPartitionsToError.put(new TopicPartition(topic.name, partition), 
error)
   }
 }
+verificationFailureRate.mark(topicPartitionsToError.size)
 topicPartitionsToError.toMap
   }
 
   private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
   // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+  verificationTimeMs.update(time.milliseconds() - 
transactionDataAndCallbacks.earliestAdditionMs)

Review Comment:
   I've decided to fix this to have an update per produce request that comes 
into the manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-13 Thread via GitHub


jolshan commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1590069645

   Thanks @kirktrue, this looks reasonable. Let's fix the conflict and let 
tests run one more time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-13 Thread via GitHub


vcrfxia commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1228495958


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());

Review Comment:
   Why is it preferable for the root to be initialized as the inner store, 
rather than this outer store? (Just trying to understand.) 
   
   It looks like most of the other stores use `root` in this method and leave 
it up to the caller to set the root accordingly, usually as the store itself if 
it's an outermost store. Curious why this store is different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-13 Thread via GitHub


divijvaidya commented on PR #13820:
URL: https://github.com/apache/kafka/pull/13820#issuecomment-1589745577

   Hi @dajac , yes let me take a look first thing tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-13 Thread via GitHub


dajac commented on PR #13820:
URL: https://github.com/apache/kafka/pull/13820#issuecomment-1589741122

   @divijvaidya As you started reviewing it, are you interested in finishing 
the review? That would unblock me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-13 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1228430026


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+while (iterator.hasNext() && predicate.get()) {
+keyValue = iterator.next();
+
+final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+final K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+minTimestamp = bufferValue.context().timestamp();
+
+if (wrapped().observedStreamTime - gracePeriod > 
minTimestamp) {
+return;
+}
+
+V value = valueSerde.deserializer().deserialize(topic, 
bufferValue.newValue());
+
+if (bufferValue.context().timestamp() != minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match 
the actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}
+
+callback.accept(new Eviction(key, value, 
bufferValue.context()));
+
+wrapped().remove(keyValue.key);
+numRecords--;
+bufferSize = 

[GitHub] [kafka] wcarlson5 commented on pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-13 Thread via GitHub


wcarlson5 commented on PR #13756:
URL: https://github.com/apache/kafka/pull/13756#issuecomment-1589678126

   @cadonna I also hid the segments as you proposed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Use admin client to create offsets topic in tests

2023-06-13 Thread via GitHub


dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1228424644


##
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##
@@ -150,12 +150,8 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
 listenerName: ListenerName = listenerName,
 adminClientConfig: Properties = new Properties
   ): Unit = {
-if (isKRaftTest()) {
-  resource(createAdminClient(brokers, listenerName, adminClientConfig)) { 
admin =>
-TestUtils.createOffsetsTopicWithAdmin(admin, brokers)
-  }
-} else {
-  TestUtils.createOffsetsTopic(zkClient, servers)

Review Comment:
   Ouch. Many tests have failed without this. I will investigate...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-13 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1228419948


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {

Review Comment:
   I don't mind obscuring the implementation and setting those values as 
default instead of exposing them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-13 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1228360963


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+while (iterator.hasNext() && predicate.get()) {
+keyValue = iterator.next();
+
+final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+final K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+minTimestamp = bufferValue.context().timestamp();
+
+if (wrapped().observedStreamTime - gracePeriod > 
minTimestamp) {
+return;
+}
+
+final V value = 
valueSerde.deserializer().deserialize(topic, bufferValue.newValue());
+
+if (bufferValue.context().timestamp() != minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match 
the actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}
+
+callback.accept(new Eviction(key, value, 
bufferValue.context()));
+
+wrapped().remove(keyValue.key);
+numRecords--;
+

[GitHub] [kafka] divijvaidya opened a new pull request, #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-13 Thread via GitHub


divijvaidya opened a new pull request, #13850:
URL: https://github.com/apache/kafka/pull/13850

   ## Problem
   RemoteIndexCache cache is accessed from multiple threads concurrently in the 
fetch from consumer code path [1]. 
   
   Currently, the RemoteIndexCache uses LinkedHashMap as the cache 
implementation internally. Since LinkedHashMap is not a thread safe data 
structure, we use coarse grained lock on the entire map/cache when writing to 
the cache.
   
   This means that if a thread if fetching information from a particular 
segment from RemoteStorageManager, other threads who are trying to access a 
different segment from the cache will also wait for the former thread to 
complete. This is due to the usage of global lock in the cache.
   
   This lock contentions leads to decrease in throughput for fetch from 
consumer for cases where RSM network call may take more time.
   
   ## Solution
   We need a data structure for the cache which satisfies the following 
requirements:
   1. Multiple threads should be able to read concurrently.
   2. Fetch for missing keys should not block read for available keys.
   3. Only one thread should fetch for a specific key.
   4. Should support LRU policy.
   
   In Java, all non concurrent data structures (such as LinkedHashMap) violate 
condition 2. We can potentially use Concurrent data structures such as 
ConcurrentHashMap but we will have to implement the LRU eviction ourselves on 
top of this. OR we can implement a LRU cache from scratch ourselves which 
satisfy the above constraints.
   
   Alternatively, (approach taken in this PR), we can use [Caffeine 
cache](https://github.com/ben-manes/caffeine) which satisfies all the 
requirements mentioned above.
   
   ## Changes
   - This PR uses Caffeine as the underlying cache for RemoteIndexCache. 
   - Old `File` API has been replaces with `Files` API introduced since JDK 7.
   
   ## Testing
   - A test has been added which verifies requirement 2 above. The test fails 
prior to the change and is successful after it.
   - New tests have been added to improve overall test coverage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15084) Remove lock contention in RemoteIndexCache

2023-06-13 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15084:


 Summary: Remove lock contention in RemoteIndexCache
 Key: KAFKA-15084
 URL: https://issues.apache.org/jira/browse/KAFKA-15084
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 3.6.0
Reporter: Divij Vaidya
Assignee: Divij Vaidya
 Fix For: 3.6.0


RemoteIndexCache cache is accessed from multiple threads concurrently in the 
fetch from consumer code path [1]. 

Currently, the RemoteIndexCache uses LinkedHashMap as the cache implementation 
internally. Since LinkedHashMap is not a thread safe data structure, we use 
coarse grained lock on the entire map/cache when writing to the cache.

This means that if a thread if fetching information from a particular segment 
from RemoteStorageManager, other threads who are trying to access a different 
segment from the cache will also wait for the former thread to complete. This 
is due to the usage of global lock in the cache.

This lock contentions leads to decrease in throughput for fetch from consumer 
for cases where RSM network call may take more time.

As a goal for this JIRA, we would like to ensure that the threads reading 
existing values in the cache do not get blocked when thread updating the cache 
is fetching data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13849: Add 3.5.0 and 3.4.1 to system tests

2023-06-13 Thread via GitHub


mimaison commented on PR #13849:
URL: https://github.com/apache/kafka/pull/13849#issuecomment-1589399101

   @mjsax @vvcephei Can you upload the 3.5.0 artifacts to the Confluent S3 
bucket as per the [release 
instructions](https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses)?
 And verify the 3.4.1 artifacts are there too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15080) Fetcher's lag never set when partition is idle

2023-06-13 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15080.
-
Fix Version/s: 3.5.1
   3.6.0
   Resolution: Fixed

> Fetcher's lag never set when partition is idle
> --
>
> Key: KAFKA-15080
> URL: https://issues.apache.org/jira/browse/KAFKA-15080
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.5.1, 3.6.0
>
>
> The PartitionFetchState's lag field is set to None when the state is created 
> and it is updated when bytes are received for a partition. For idle 
> partitions (newly created or not), the lag is never updated because 
> `validBytes > 0` is never true. As a side effect, the partition is considered 
> out-of-sync and could be incorrectly throttled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-13 Thread via GitHub


mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1228123874


##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet 
interests, TopicVisitor
 (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
 writer.handleSnapshot(image, consumer);
 assertEquals(1, opCounts.remove("CreateTopic"));
-assertEquals(1, opCounts.remove("UpdatePartition"));
+assertEquals(1, opCounts.remove("UpdatePartitions"));
 assertEquals(1, opCounts.remove("UpdateTopic"));
 assertEquals(0, opCounts.size());
 assertEquals("bar", topicClient.createdTopics.get(0));
 }
+
+@Test
+public void testDeleteTopicFromSnapshot() {
+CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+visitor.visitTopic("spam", Uuid.randomUuid(), 
Collections.emptyMap());
+}
+};
+CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+.setBrokersInZk(0)
+.setTopicMigrationClient(topicClient)
+.build();
+
+KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+Map opCounts = new HashMap<>();
+KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+(logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+assertEquals(1, opCounts.remove("DeleteTopic"));
+assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+assertEquals(0, opCounts.size());
+assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+
+opCounts.clear();
+topicClient.reset();
+writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+assertEquals(1, opCounts.remove("DeleteTopic"));
+assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+assertEquals(2, opCounts.remove("CreateTopic"));
+assertEquals(0, opCounts.size());
+assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+}
+
+@FunctionalInterface
+interface TopicVerifier {
+void verify(Uuid topicId, TopicsImage topicsImage, 
CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+}
+
+void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+// Set up a topic with two partitions in ZK (via iterateTopics) and a 
KRaft TopicsImage, then run the given verifier
+Uuid topicId = Uuid.randomUuid();
+Map partitionMap = new HashMap<>();
+partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new 
int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, 
-1));
+partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new 
int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, 
-1));
+
+CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+Map> assignments = new HashMap<>();
+assignments.put(0, Arrays.asList(2, 3, 4));
+assignments.put(1, Arrays.asList(3, 4, 5));
+visitor.visitTopic("spam", topicId, assignments);
+visitor.visitPartition(new TopicIdPartition(topicId, new 
TopicPartition("spam", 0)), partitionMap.get(0));
+visitor.visitPartition(new TopicIdPartition(topicId, new 
TopicPartition("spam", 1)), partitionMap.get(1));
+}
+};
+
+CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+.setBrokersInZk(0)
+.setTopicMigrationClient(topicClient)
+.build();
+KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 
0).message());
+delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 
1).message());
+TopicsImage image = delta.apply();
+
+verifier.verify(topicId, image, topicClient, writer);
+}
+
+@Test
+public void testUpdatePartitionsFromSnapshot() {
+setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+ 

[GitHub] [kafka] dajac merged pull request #13843: KAFKA-15080; Fetcher's lag never set when partition is idle

2023-06-13 Thread via GitHub


dajac merged PR #13843:
URL: https://github.com/apache/kafka/pull/13843


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-13 Thread via GitHub


showuon commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1228079251


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2925,123 +2919,11 @@ void handleFailure(Throwable throwable) {
 @Override
 public DeleteRecordsResult deleteRecords(final Map recordsToDelete,
  final DeleteRecordsOptions 
options) {
+AdminApiFuture.SimpleAdminApiFuture 
future = DeleteRecordsHandler.newFuture(recordsToDelete.keySet());

Review Comment:
   nit: `AdminApiFuture.SimpleAdminApiFuture` -> `SimpleAdminApiFuture` should 
work.



##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class DeleteRecordsHandler extends Batched {
+
+private final Map recordsToDelete;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public DeleteRecordsHandler(
+Map recordsToDelete,
+LogContext logContext
+) {
+this.recordsToDelete = recordsToDelete;
+this.log = logContext.logger(DeleteRecordsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "deleteRecords";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+public static SimpleAdminApiFuture 
newFuture(
+Collection topicPartitions
+) {
+return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+}
+
+@Override
+public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map 
deletionsForTopic = new HashMap<>();
+for (Map.Entry entry: 
recordsToDelete.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.computeIfAbsent(
+topicPartition.topic(),
+key -> new 
DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic())
+);
+deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
+.setPartitionIndex(topicPartition.partition())
+.setOffset(entry.getValue().beforeOffset()));
+}
+
+DeleteRecordsRequestData data = new DeleteRecordsRequestData()
+.setTopics(new ArrayList<>(deletionsForTopic.values()));

Review Comment:
   Should we set timeout here like before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] dajac merged pull request #13844: MINOR: Make sure replicas will not be removed in initial ISR

2023-06-13 Thread via GitHub


dajac merged PR #13844:
URL: https://github.com/apache/kafka/pull/13844


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2023-06-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732032#comment-17732032
 ] 

Bruno Cadonna commented on KAFKA-12679:
---

[~Andras Hatvani] Lucas' PR solves the issue for the state updater (aka new 
restoration architecture). It does not solve it for the current initialization. 
   

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Priority: Major
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2023-06-13 Thread Andras Hatvani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732025#comment-17732025
 ] 

Andras Hatvani edited comment on KAFKA-12679 at 6/13/23 11:08 AM:
--

[~lbrutschy] [~mjsax] This is a pressing issue for my client as I could only 
work it around by reducing {{num.stream.threads}} to {{1}} which shouldn't be 
necessary.


was (Author: andras hatvani):
[~lbrutschy] [~mjsax] This is a pressing issue for my client and I've worked it 
around by reducing `num.stream.threads` to `1` which shouldn't be necessary.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Priority: Major
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2023-06-13 Thread Andras Hatvani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732025#comment-17732025
 ] 

Andras Hatvani commented on KAFKA-12679:


[~lbrutschy] [~mjsax] This is a pressing issue for my client and I've worked it 
around by reducing `num.stream.threads` to `1` which shouldn't be necessary.

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Priority: Major
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on pull request #12875: KAFKA-12679: Handle lock exceptions in state updater

2023-06-13 Thread via GitHub


cadonna commented on PR #12875:
URL: https://github.com/apache/kafka/pull/12875#issuecomment-1589078205

   @andrashatvani Do you mean the state updater?
   I am currently working on enabling the state updater by default on trunk. We 
plan to officially release it in 3.6.
   However, you can already test it in 3.5 by setting the internal config 
`__state.updater.enabled__` to true. Note that we recently found and [fixed a 
bug](https://github.com/apache/kafka/pull/13829) in the state updater. The fix 
is only on trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13849: Add 3.5.0 and 3.4.1 to system tests

2023-06-13 Thread via GitHub


mimaison opened a new pull request, #13849:
URL: https://github.com/apache/kafka/pull/13849

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14257) Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response

2023-06-13 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-14257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732020#comment-17732020
 ] 

çağla boynueğri commented on KAFKA-14257:
-

all the brokers must have the same CLUSTER_ID, as for anyone wondering :)

> Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response
> -
>
> Key: KAFKA-14257
> URL: https://issues.apache.org/jira/browse/KAFKA-14257
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.2.3
>Reporter: jianbin.chen
>Priority: Major
>
> Please help me see why the error message is output indefinitely
> broker1:
> {code:java}
> process.roles=broker,controller
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> node.id=1
> listeners=PLAINTEXT://192.168.6.57:9092,CONTROLLER://192.168.6.57:9093
> inter.broker.listener.name=PLAINTEXT
> advertised.listeners=PLAINTEXT://192.168.6.57:9092
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs{code}
> broker2
> {code:java}
> process.roles=broker,controller
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> node.id=2
> listeners=PLAINTEXT://192.168.6.56:9092,CONTROLLER://192.168.6.56:9093
> inter.broker.listener.name=PLAINTEXT
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs{code}
> broker3
> {code:java}
> process.roles=broker,controller
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> node.id=3
> listeners=PLAINTEXT://192.168.6.55:9092,CONTROLLER://192.168.6.55:9093
> inter.broker.listener.name=PLAINTEXT
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs
> {code}
> error msg:
> {code:java}
> [2022-09-22 18:44:01,601] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=378, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,625] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=380, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,655] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=382, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,679] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=384, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,706] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=386, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,729] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=388, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14257) Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response

2023-06-13 Thread Jira


[ https://issues.apache.org/jira/browse/KAFKA-14257 ]


çağla boynueğri deleted comment on KAFKA-14257:
-

was (Author: JIRAUSER300861):
Hi,
I am getting the same error, can you remember how did you solve the issue ?

> Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response
> -
>
> Key: KAFKA-14257
> URL: https://issues.apache.org/jira/browse/KAFKA-14257
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.2.3
>Reporter: jianbin.chen
>Priority: Major
>
> Please help me see why the error message is output indefinitely
> broker1:
> {code:java}
> process.roles=broker,controller
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> node.id=1
> listeners=PLAINTEXT://192.168.6.57:9092,CONTROLLER://192.168.6.57:9093
> inter.broker.listener.name=PLAINTEXT
> advertised.listeners=PLAINTEXT://192.168.6.57:9092
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs{code}
> broker2
> {code:java}
> process.roles=broker,controller
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> node.id=2
> listeners=PLAINTEXT://192.168.6.56:9092,CONTROLLER://192.168.6.56:9093
> inter.broker.listener.name=PLAINTEXT
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs{code}
> broker3
> {code:java}
> process.roles=broker,controller
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> node.id=3
> listeners=PLAINTEXT://192.168.6.55:9092,CONTROLLER://192.168.6.55:9093
> inter.broker.listener.name=PLAINTEXT
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs
> {code}
> error msg:
> {code:java}
> [2022-09-22 18:44:01,601] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=378, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,625] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=380, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,655] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=382, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,679] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=384, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,706] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=386, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,729] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=388, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14257) Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response

2023-06-13 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-14257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732014#comment-17732014
 ] 

çağla boynueğri commented on KAFKA-14257:
-

Hi,
I am getting the same error, can you remember how did you solve the issue ?

> Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response
> -
>
> Key: KAFKA-14257
> URL: https://issues.apache.org/jira/browse/KAFKA-14257
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.2.3
>Reporter: jianbin.chen
>Priority: Major
>
> Please help me see why the error message is output indefinitely
> broker1:
> {code:java}
> process.roles=broker,controller
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> node.id=1
> listeners=PLAINTEXT://192.168.6.57:9092,CONTROLLER://192.168.6.57:9093
> inter.broker.listener.name=PLAINTEXT
> advertised.listeners=PLAINTEXT://192.168.6.57:9092
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs{code}
> broker2
> {code:java}
> process.roles=broker,controller
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> node.id=2
> listeners=PLAINTEXT://192.168.6.56:9092,CONTROLLER://192.168.6.56:9093
> inter.broker.listener.name=PLAINTEXT
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs{code}
> broker3
> {code:java}
> process.roles=broker,controller
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> node.id=3
> listeners=PLAINTEXT://192.168.6.55:9092,CONTROLLER://192.168.6.55:9093
> inter.broker.listener.name=PLAINTEXT
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs
> {code}
> error msg:
> {code:java}
> [2022-09-22 18:44:01,601] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=378, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,625] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=380, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,655] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=382, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,679] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=384, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,706] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=386, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,729] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=388, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-13 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1227894014


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {

Review Comment:
   @wcarlson5 Sorry for commenting again on the segmented state store but I 
really want to understand since they are quite new to me. 
   As far as I understand, you extend the 
`AbstractRocksDBTimeOrderedSegmentedBytesStore` which is a state store that 
stores records in timestamp-order into segments and it has an index on the 
record key. It seems to me that you do not use the segments nor the index. I 
understand that `AbstractRocksDBTimeOrderedSegmentedBytesStore` implements 
interface `SegmentedByteStore` which gives you the ability to fetch data by 
time ranges. Furthermore, `AbstractRocksDBTimeOrderedSegmentedBytesStore` 
maintains the observed stream time which is also something that comes in handy 
for your PR.
   
   My proposal is to hide the segmented aspect of the state store as much as 
possible. More precisely: 
   - Rename `RocksDBTimeOrderedKeyValueSegmentedBytesStore` to 
`RocksDBTimeOrderedKeyValueBytesStore`
   - Do not expose `retention` and `segmentInterval` in 
`RocksDBTimeOrderedKeyValueBytesStore`'s constructor and in the corresponding 
supplier.
   - Set both `retention` and `segmentInterval` in the call to the super 
constructor to high values as @vcrfxia suggested.
   
   In such a way, you can use the features you want to use from the time 
ordered segmented state store and at the same hide all the aspects that you do 
not need behind your abstraction. WDYT? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-13 Thread via GitHub


showuon commented on PR #13828:
URL: https://github.com/apache/kafka/pull/13828#issuecomment-1588961245

   @divijvaidya @satishd , PR updated. Thanks.
   
   > 1. We probably want to update the KIP-405 here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs
 and specify that this config is optional
   
   Updated.
   
   > 2. From the javadoc of RLMM
   > When this is configured all other
   >  required properties can be passed as properties with prefix of 
'remote.log.metadata.manager.listener
   > Can we please add a test to verify this? (asking because while 
constructing the rlmmProps, we don't pass any other configs with the listener 
prefix)
   
   I think we don't have this implemented. We should pass 
`remote.log.metadata.*` into RLMM based on KIP-405. Created 
[KAFKA-15083](https://issues.apache.org/jira/browse/KAFKA-15083) for this issue.
   
   Thanks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-13 Thread via GitHub


showuon commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1227858635


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -474,7 +474,14 @@ class BrokerServer(
   new KafkaConfig(config.originals(), true)
 
   // Start RemoteLogManager before broker start serving the requests.
-  remoteLogManager.foreach(_.startup())
+  remoteLogManagerOpt.foreach(rlm => {
+val listenerName = 
ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
+endpoints.stream.filter(e => e.listenerName.equals(listenerName))
+  .findAny()

Review Comment:
   Agree. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-06-13 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15083:
-

 Summary: Passing "remote.log.metadata.*" configs into RLMM
 Key: KAFKA-15083
 URL: https://issues.apache.org/jira/browse/KAFKA-15083
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen


Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

2023-06-13 Thread via GitHub


clolov commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1588950243

   Okay, the name has been changed and this has been rebased. I ran 
checkstyleTest and spotbugsTest and they are passing locally. If everything 
passes in the automated tests you should be able to merge it at your leisure!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito

2023-06-13 Thread via GitHub


clolov commented on PR #13712:
URL: https://github.com/apache/kafka/pull/13712#issuecomment-1588898530

   Thank you! I will now address the comments on the other one!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito

2023-06-13 Thread via GitHub


cadonna merged PR #13712:
URL: https://github.com/apache/kafka/pull/13712


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito

2023-06-13 Thread via GitHub


clolov commented on PR #13712:
URL: https://github.com/apache/kafka/pull/13712#issuecomment-1588826720

   Ah, sorry, yeah, I have caused the confusion. Technically the order of 
merging between this and https://github.com/apache/kafka/pull/13711 doesn't 
matter as long as one is rebased on top of the other afterwards. This appears 
to be ready to merge so let's go with merging this and I will then update 13711.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Use admin client to create offsets topic in tests

2023-06-13 Thread via GitHub


dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1227742933


##
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##
@@ -150,12 +150,8 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
 listenerName: ListenerName = listenerName,
 adminClientConfig: Properties = new Properties
   ): Unit = {
-if (isKRaftTest()) {
-  resource(createAdminClient(brokers, listenerName, adminClientConfig)) { 
admin =>
-TestUtils.createOffsetsTopicWithAdmin(admin, brokers)
-  }
-} else {
-  TestUtils.createOffsetsTopic(zkClient, servers)

Review Comment:
   Is there a reason why we kept using the zk client here? It seems much better 
to use the admin client all the time unless explicitly stated otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13848: MINOR: Use admin client to create offsets topic

2023-06-13 Thread via GitHub


dajac opened a new pull request, #13848:
URL: https://github.com/apache/kafka/pull/13848

   I have see failures like the following ones in a few builds:
   
   ```
   Build / JDK 11 and Scala 2.13 / testDescribeSimpleConsumerGroup() – 
kafka.admin.DescribeConsumerGroupTest
   org.apache.kafka.common.errors.TopicExistsException: Topic 
'__consumer_offsets' already exists.
   ```
   
   Many tests still use `TestUtils.createOffsetsTopic(zkClient, servers)` to 
create the offsets topic. This method does not handle the case where the topic 
exists (e.g. in the case of a retry). Instead of fixing it, I think that it is 
just better to use the admin client variant which is more reliable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-13 Thread via GitHub


dajac merged PR #13795:
URL: https://github.com/apache/kafka/pull/13795


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

2023-06-13 Thread via GitHub


dengziming commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1227582802


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel,
 if (apiVersionRequest.hasUnsupportedRequestVersion) {
   requestHelper.sendResponseMaybeThrottle(request,
 requestThrottleMs => 
apiVersionRequest.getErrorResponse(requestThrottleMs, 
UNSUPPORTED_VERSION.exception))
-  CompletableFuture.completedFuture[Unit](())
 } else if (!apiVersionRequest.isValid) {
   requestHelper.sendResponseMaybeThrottle(request,
 requestThrottleMs => 
apiVersionRequest.getErrorResponse(requestThrottleMs, 
INVALID_REQUEST.exception))
-  CompletableFuture.completedFuture[Unit](())
 } else {
-  val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal, OptionalLong.empty())
-  controller.finalizedFeatures(context).handle { (result, exception) =>
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-  if (exception != null) {
-apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-  } else {
-apiVersionManager.apiVersionResponse(requestThrottleMs, 
result.featureMap().asScala.toMap, result.epoch())
-  }
-})
-  }
+  requestHelper.sendResponseMaybeThrottle(request,
+requestThrottleMs => 
apiVersionManager.apiVersionResponse(requestThrottleMs))

Review Comment:
   I think this is inevitable, the broker also get the feature data from 
MetadataCache,  which may also be stale data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org