[GitHub] [kafka] ben-manes commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
ben-manes commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229087123 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position } + /** + * Close should synchronously cleanup the resources used by this cache. + * This index is closed when [[RemoteLogManager]] is closed. + */ def close(): Unit = { -closed = true -cleanerThread.shutdown() -// Close all the opened indexes. -lock synchronized { - entries.values().stream().forEach(entry => entry.close()) +// make close idempotent +if (!closed.getAndSet(true)) { + // Initiate shutdown for cleaning thread + val shutdownRequired = cleanerThread.initiateShutdown() + // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk. + internalCache.asMap().forEach((_, entry) => entry.close()) + // Perform any pending activities required by the cache for cleanup + internalCache.cleanUp() Review Comment: this would do pending maintenance work like expiration. Perhaps you meant `invalidateAll()` to discard all of the entries? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ben-manes commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
ben-manes commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229083529 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -152,8 +194,7 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM val txnIndex = new TransactionIndex(offset, txnIndexFile) txnIndex.sanityCheck() - val entry = new Entry(offsetIndex, timeIndex, txnIndex) - entries.put(uuid, entry) + internalCache.put(uuid, new Entry(offsetIndex, timeIndex, txnIndex)) Review Comment: it may not be desirable here, but typically you should prefer to use a computation, `cache.get(key, mappingFunction)`, to make the loading atomic and avoid a cache stampede. Otherwise a race could allow duplicate work if not protected elsewhere. The downside is that blocking might not be wanted, there is a need for recursive writes, ect. in which case an `AsyncCache` can help workaround those by decoupling the map from the computation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ben-manes commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
ben-manes commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229083529 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -152,8 +194,7 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM val txnIndex = new TransactionIndex(offset, txnIndexFile) txnIndex.sanityCheck() - val entry = new Entry(offsetIndex, timeIndex, txnIndex) - entries.put(uuid, entry) + internalCache.put(uuid, new Entry(offsetIndex, timeIndex, txnIndex)) Review Comment: it may not be desirable, but typically you should prefer to use a computation, `cache.get(key, mappingFunction)`, to make the loading atomic and avoid a cache stampede. Otherwise a race could allow duplicate work if not protected elsewhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ben-manes commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
ben-manes commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229081529 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -136,12 +176,14 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM val offset = name.substring(0, firstIndex).toInt val uuid = Uuid.fromString(name.substring(firstIndex + 1, name.lastIndexOf('_'))) - if(!entries.containsKey(uuid)) { + if (internalCache.getIfPresent(uuid) == null) { Review Comment: did you mean `internalCache.asMap().containsKey(uuid)`? That would be an existence check, as this is an entry read so it results in a policy hit/miss behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13453: KAFKA-12525: Ignoring Stale status statuses when reading from Status …
vamossagar12 commented on PR #13453: URL: https://github.com/apache/kafka/pull/13453#issuecomment-159051 @enzo-cappa , thanks.. yeah waiting for a round of review on this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sagarrao12 commented on pull request #13453: KAFKA-12525: Ignoring Stale status statuses when reading from Status …
sagarrao12 commented on PR #13453: URL: https://github.com/apache/kafka/pull/13453#issuecomment-1590511123 @enzo-cappa , thanks.. yeah waiting for a round of review on this one... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
satishd commented on PR #13828: URL: https://github.com/apache/kafka/pull/13828#issuecomment-1590492505 > I think we don't have this implemented. We should pass remote.log.metadata.* into RLMM based on KIP-405. Created [KAFKA-15083](https://issues.apache.org/jira/browse/KAFKA-15083) for this issue. @showuon This is no more valid, KIP needs to be updated with the prefix based configs for RSM and RLMM. Will update the KIP with those details. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #13845: KAFKA-15078; KRaft leader replys with snapshot for offset 0
dengziming commented on code in PR #13845: URL: https://github.com/apache/kafka/pull/13845#discussion_r1228931662 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1017,7 +1017,16 @@ private FetchResponseData tryCompleteFetchRequest( long fetchOffset = request.fetchOffset(); int lastFetchedEpoch = request.lastFetchedEpoch(); LeaderState state = quorum.leaderStateOrThrow(); -ValidOffsetAndEpoch validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + +Optional latestSnapshotId = log.latestSnapshotId(); +final ValidOffsetAndEpoch validOffsetAndEpoch; +if (fetchOffset == 0 && latestSnapshotId.isPresent()) { Review Comment: I have tried to figure out some other cases we prefer to read the snapshot, a common case is `highWatermark` - `fetchOffset` > count(snapshot record). Do you think it's possible to write count(snapshot record) in the snapshot header? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky opened a new pull request, #13851: check enable rack
lihaosky opened a new pull request, #13851: URL: https://github.com/apache/kafka/pull/13851 ## Description Initial implementation to check if rack aware assignment can be enabled ## Test TODO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1590237911 unit.kafka.server.AddPartitionsToTxnManagerTest.testAddPartitionsToTxnManagerMetrics() is failing. I was a bit worried this could be flaky. Will investigate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
jolshan commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1590186665 With respect to the retries. There is currently a known error where if we produce using an idempotent producer we can get stuck in a retry loop. 1. Request A with sequence 0 fails with a retriable error 2. Request B with sequence 1 succeeds. Because we currently don't assert the sequence is 0 on the first records written (or any records written when we don't yet have producer state, this will succeed) 3. Request A will be retried and will always fail with a retriable OutOfOrderSequence error. Imagine at step 2, we have a request C that fails with a non-retriable error. Will we get stuck waiting for Request A until the retries run out? The default is pretty high so this will go on for some time. BTW this issue is https://issues.apache.org/jira/browse/KAFKA-14359, and we are still considering the best way to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1228732870 ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -240,16 +240,17 @@ object RequestChannel extends Logging { val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = + val metricNames = if (header.apiKey == ApiKeys.FETCH) { - val isFromFollower = body[FetchRequest].isFromFollower - Seq( -if (isFromFollower) RequestMetrics.followFetchMetricName + val specifiedMetricName = +if (body[FetchRequest].isFromFollower) RequestMetrics.followFetchMetricName else RequestMetrics.consumerFetchMetricName - ) + Seq(specifiedMetricName, header.apiKey.name) +} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].verifyOnlyRequest()) { +Seq(RequestMetrics.verifyPartitionsInTxnMetricName) +} else { + Seq(header.apiKey.name) } -else Seq.empty - val metricNames = fetchMetricNames :+ header.apiKey.name Review Comment: Note: Like fetch metrics, both verification and nonverification addPartitionToTxn requests will count towards the rate for the request type. This is because the metric is keyed on the apiKey, and I can't distinguish the two. See below: ``` m.requestRate(header.apiVersion).mark() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1228729149 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3405,6 +3405,59 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); } +@Test +public void testBackgroundInvalidStateTransitionIsFatal() { +doInitTransactions(); +assertTrue(transactionManager.isTransactional()); + +transactionManager.setPoisonStateOnInvalidTransition(true); Review Comment: This works because we create a new transactionManager object per run of the test (ie, the thread local variable isn't reused between tests) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1228731391 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -85,12 +93,14 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time topicPartitionsToError.put(new TopicPartition(topic.name, partition), error) } } +verificationFailureRate.mark(topicPartitionsToError.size) topicPartitionsToError.toMap } private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { // Note: Synchronization is not needed on inflightNodes since it is always accessed from this thread. + verificationTimeMs.update(time.milliseconds() - transactionDataAndCallbacks.earliestAdditionMs) Review Comment: I've decided to fix this to have an update per produce request that comes into the manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1590069645 Thanks @kirktrue, this looks reasonable. Let's fix the conflict and let tests run one more time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
vcrfxia commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1228495958 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); Review Comment: Why is it preferable for the root to be initialized as the inner store, rather than this outer store? (Just trying to understand.) It looks like most of the other stores use `root` in this method and leave it up to the caller to set the root accordingly, usually as the store itself if it's an outermost store. Curious why this store is different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13820: MINOR: Move Timer/TimingWheel to server-common
divijvaidya commented on PR #13820: URL: https://github.com/apache/kafka/pull/13820#issuecomment-1589745577 Hi @dajac , yes let me take a look first thing tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on PR #13820: URL: https://github.com/apache/kafka/pull/13820#issuecomment-1589741122 @divijvaidya As you started reviewing it, are you interested in finishing the review? That would unblock me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1228430026 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +if (wrapped().observedStreamTime - gracePeriod > minTimestamp) { +return; +} + +V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bu
[GitHub] [kafka] wcarlson5 commented on pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on PR #13756: URL: https://github.com/apache/kafka/pull/13756#issuecomment-1589678126 @cadonna I also hid the segments as you proposed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Use admin client to create offsets topic in tests
dajac commented on code in PR #13848: URL: https://github.com/apache/kafka/pull/13848#discussion_r1228424644 ## core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala: ## @@ -150,12 +150,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { listenerName: ListenerName = listenerName, adminClientConfig: Properties = new Properties ): Unit = { -if (isKRaftTest()) { - resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => -TestUtils.createOffsetsTopicWithAdmin(admin, brokers) - } -} else { - TestUtils.createOffsetsTopic(zkClient, servers) Review Comment: Ouch. Many tests have failed without this. I will investigate... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1228419948 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { Review Comment: I don't mind obscuring the implementation and setting those values as default instead of exposing them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1228360963 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +if (wrapped().observedStreamTime - gracePeriod > minTimestamp) { +return; +} + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSiz
[GitHub] [kafka] divijvaidya opened a new pull request, #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya opened a new pull request, #13850: URL: https://github.com/apache/kafka/pull/13850 ## Problem RemoteIndexCache cache is accessed from multiple threads concurrently in the fetch from consumer code path [1]. Currently, the RemoteIndexCache uses LinkedHashMap as the cache implementation internally. Since LinkedHashMap is not a thread safe data structure, we use coarse grained lock on the entire map/cache when writing to the cache. This means that if a thread if fetching information from a particular segment from RemoteStorageManager, other threads who are trying to access a different segment from the cache will also wait for the former thread to complete. This is due to the usage of global lock in the cache. This lock contentions leads to decrease in throughput for fetch from consumer for cases where RSM network call may take more time. ## Solution We need a data structure for the cache which satisfies the following requirements: 1. Multiple threads should be able to read concurrently. 2. Fetch for missing keys should not block read for available keys. 3. Only one thread should fetch for a specific key. 4. Should support LRU policy. In Java, all non concurrent data structures (such as LinkedHashMap) violate condition 2. We can potentially use Concurrent data structures such as ConcurrentHashMap but we will have to implement the LRU eviction ourselves on top of this. OR we can implement a LRU cache from scratch ourselves which satisfy the above constraints. Alternatively, (approach taken in this PR), we can use [Caffeine cache](https://github.com/ben-manes/caffeine) which satisfies all the requirements mentioned above. ## Changes - This PR uses Caffeine as the underlying cache for RemoteIndexCache. - Old `File` API has been replaces with `Files` API introduced since JDK 7. ## Testing - A test has been added which verifies requirement 2 above. The test fails prior to the change and is successful after it. - New tests have been added to improve overall test coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15084) Remove lock contention in RemoteIndexCache
Divij Vaidya created KAFKA-15084: Summary: Remove lock contention in RemoteIndexCache Key: KAFKA-15084 URL: https://issues.apache.org/jira/browse/KAFKA-15084 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 3.6.0 Reporter: Divij Vaidya Assignee: Divij Vaidya Fix For: 3.6.0 RemoteIndexCache cache is accessed from multiple threads concurrently in the fetch from consumer code path [1]. Currently, the RemoteIndexCache uses LinkedHashMap as the cache implementation internally. Since LinkedHashMap is not a thread safe data structure, we use coarse grained lock on the entire map/cache when writing to the cache. This means that if a thread if fetching information from a particular segment from RemoteStorageManager, other threads who are trying to access a different segment from the cache will also wait for the former thread to complete. This is due to the usage of global lock in the cache. This lock contentions leads to decrease in throughput for fetch from consumer for cases where RSM network call may take more time. As a goal for this JIRA, we would like to ensure that the threads reading existing values in the cache do not get blocked when thread updating the cache is fetching data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #13849: Add 3.5.0 and 3.4.1 to system tests
mimaison commented on PR #13849: URL: https://github.com/apache/kafka/pull/13849#issuecomment-1589399101 @mjsax @vvcephei Can you upload the 3.5.0 artifacts to the Confluent S3 bucket as per the [release instructions](https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses)? And verify the 3.4.1 artifacts are there too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15080) Fetcher's lag never set when partition is idle
[ https://issues.apache.org/jira/browse/KAFKA-15080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15080. - Fix Version/s: 3.5.1 3.6.0 Resolution: Fixed > Fetcher's lag never set when partition is idle > -- > > Key: KAFKA-15080 > URL: https://issues.apache.org/jira/browse/KAFKA-15080 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.5.1, 3.6.0 > > > The PartitionFetchState's lag field is set to None when the state is created > and it is updated when bytes are received for a partition. For idle > partitions (newly created or not), the lag is never updated because > `validBytes > 0` is never true. As a side effect, the partition is considered > out-of-sync and could be incorrectly throttled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
mumrah commented on code in PR #13802: URL: https://github.com/apache/kafka/pull/13802#discussion_r1228123874 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java: ## @@ -195,9 +235,658 @@ public void iterateTopics(EnumSet interests, TopicVisitor (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); writer.handleSnapshot(image, consumer); assertEquals(1, opCounts.remove("CreateTopic")); -assertEquals(1, opCounts.remove("UpdatePartition")); +assertEquals(1, opCounts.remove("UpdatePartitions")); assertEquals(1, opCounts.remove("UpdateTopic")); assertEquals(0, opCounts.size()); assertEquals("bar", topicClient.createdTopics.get(0)); } + +@Test +public void testDeleteTopicFromSnapshot() { +CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { +@Override +public void iterateTopics(EnumSet interests, TopicVisitor visitor) { +visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap()); +} +}; +CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() +.setBrokersInZk(0) +.setTopicMigrationClient(topicClient) +.build(); + +KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + +Map opCounts = new HashMap<>(); +KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, +(logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); +writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer); +assertEquals(1, opCounts.remove("DeleteTopic")); +assertEquals(1, opCounts.remove("DeleteTopicConfig")); +assertEquals(0, opCounts.size()); +assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics); + +opCounts.clear(); +topicClient.reset(); +writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer); +assertEquals(1, opCounts.remove("DeleteTopic")); +assertEquals(1, opCounts.remove("DeleteTopicConfig")); +assertEquals(2, opCounts.remove("CreateTopic")); +assertEquals(0, opCounts.size()); +assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics); +assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics); +} + +@FunctionalInterface +interface TopicVerifier { +void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer); +} + +void setupTopicWithTwoPartitions(TopicVerifier verifier) { +// Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier +Uuid topicId = Uuid.randomUuid(); +Map partitionMap = new HashMap<>(); +partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1)); +partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1)); + +CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { +@Override +public void iterateTopics(EnumSet interests, TopicVisitor visitor) { +Map> assignments = new HashMap<>(); +assignments.put(0, Arrays.asList(2, 3, 4)); +assignments.put(1, Arrays.asList(3, 4, 5)); +visitor.visitTopic("spam", topicId, assignments); +visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0)); +visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1)); +} +}; + +CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() +.setBrokersInZk(0) +.setTopicMigrationClient(topicClient) +.build(); +KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + +TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY); +delta.replay(new TopicRecord().setTopicId(topicId).setName("spam")); +delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message()); +delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message()); +TopicsImage image = delta.apply(); + +verifier.verify(topicId, image, topicClient, writer); +} + +@Test +public void testUpdatePartitionsFromSnapshot() { +setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { +
[GitHub] [kafka] dajac merged pull request #13843: KAFKA-15080; Fetcher's lag never set when partition is idle
dajac merged PR #13843: URL: https://github.com/apache/kafka/pull/13843 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords
showuon commented on code in PR #13760: URL: https://github.com/apache/kafka/pull/13760#discussion_r1228079251 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2925,123 +2919,11 @@ void handleFailure(Throwable throwable) { @Override public DeleteRecordsResult deleteRecords(final Map recordsToDelete, final DeleteRecordsOptions options) { +AdminApiFuture.SimpleAdminApiFuture future = DeleteRecordsHandler.newFuture(recordsToDelete.keySet()); Review Comment: nit: `AdminApiFuture.SimpleAdminApiFuture` -> `SimpleAdminApiFuture` should work. ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.admin.DeletedRecords; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidMetadataException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.DeleteRecordsRequestData; +import org.apache.kafka.common.message.DeleteRecordsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DeleteRecordsRequest; +import org.apache.kafka.common.requests.DeleteRecordsResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public final class DeleteRecordsHandler extends Batched { + +private final Map recordsToDelete; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public DeleteRecordsHandler( +Map recordsToDelete, +LogContext logContext +) { +this.recordsToDelete = recordsToDelete; +this.log = logContext.logger(DeleteRecordsHandler.class); +this.lookupStrategy = new PartitionLeaderStrategy(logContext); +} + +@Override +public String apiName() { +return "deleteRecords"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return this.lookupStrategy; +} + +public static SimpleAdminApiFuture newFuture( +Collection topicPartitions +) { +return AdminApiFuture.forKeys(new HashSet<>(topicPartitions)); +} + +@Override +public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { +Map deletionsForTopic = new HashMap<>(); +for (Map.Entry entry: recordsToDelete.entrySet()) { +TopicPartition topicPartition = entry.getKey(); +DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = deletionsForTopic.computeIfAbsent( +topicPartition.topic(), +key -> new DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic()) +); +deleteRecords.partitions().add(new DeleteRecordsRequestData.DeleteRecordsPartition() +.setPartitionIndex(topicPartition.partition()) +.setOffset(entry.getValue().beforeOffset())); +} + +DeleteRecordsRequestData data = new DeleteRecordsRequestData() +.setTopics(new ArrayList<>(deletionsForTopic.values())); Review Comment: Should we set timeout here like before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-u
[GitHub] [kafka] dajac merged pull request #13844: MINOR: Make sure replicas will not be removed in initial ISR
dajac merged PR #13844: URL: https://github.com/apache/kafka/pull/13844 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732032#comment-17732032 ] Bruno Cadonna commented on KAFKA-12679: --- [~Andras Hatvani] Lucas' PR solves the issue for the state updater (aka new restoration architecture). It does not solve it for the current initialization. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Priority: Major > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732025#comment-17732025 ] Andras Hatvani edited comment on KAFKA-12679 at 6/13/23 11:08 AM: -- [~lbrutschy] [~mjsax] This is a pressing issue for my client as I could only work it around by reducing {{num.stream.threads}} to {{1}} which shouldn't be necessary. was (Author: andras hatvani): [~lbrutschy] [~mjsax] This is a pressing issue for my client and I've worked it around by reducing `num.stream.threads` to `1` which shouldn't be necessary. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Priority: Major > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732025#comment-17732025 ] Andras Hatvani commented on KAFKA-12679: [~lbrutschy] [~mjsax] This is a pressing issue for my client and I've worked it around by reducing `num.stream.threads` to `1` which shouldn't be necessary. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Priority: Major > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on pull request #12875: KAFKA-12679: Handle lock exceptions in state updater
cadonna commented on PR #12875: URL: https://github.com/apache/kafka/pull/12875#issuecomment-1589078205 @andrashatvani Do you mean the state updater? I am currently working on enabling the state updater by default on trunk. We plan to officially release it in 3.6. However, you can already test it in 3.5 by setting the internal config `__state.updater.enabled__` to true. Note that we recently found and [fixed a bug](https://github.com/apache/kafka/pull/13829) in the state updater. The fix is only on trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #13849: Add 3.5.0 and 3.4.1 to system tests
mimaison opened a new pull request, #13849: URL: https://github.com/apache/kafka/pull/13849 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14257) Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response
[ https://issues.apache.org/jira/browse/KAFKA-14257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732020#comment-17732020 ] çağla boynueğri commented on KAFKA-14257: - all the brokers must have the same CLUSTER_ID, as for anyone wondering :) > Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response > - > > Key: KAFKA-14257 > URL: https://issues.apache.org/jira/browse/KAFKA-14257 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.2.3 >Reporter: jianbin.chen >Priority: Major > > Please help me see why the error message is output indefinitely > broker1: > {code:java} > process.roles=broker,controller > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > node.id=1 > listeners=PLAINTEXT://192.168.6.57:9092,CONTROLLER://192.168.6.57:9093 > inter.broker.listener.name=PLAINTEXT > advertised.listeners=PLAINTEXT://192.168.6.57:9092 > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs{code} > broker2 > {code:java} > process.roles=broker,controller > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > node.id=2 > listeners=PLAINTEXT://192.168.6.56:9092,CONTROLLER://192.168.6.56:9093 > inter.broker.listener.name=PLAINTEXT > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs{code} > broker3 > {code:java} > process.roles=broker,controller > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > node.id=3 > listeners=PLAINTEXT://192.168.6.55:9092,CONTROLLER://192.168.6.55:9093 > inter.broker.listener.name=PLAINTEXT > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs > {code} > error msg: > {code:java} > [2022-09-22 18:44:01,601] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=378, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,625] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=380, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,655] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=382, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,679] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=384, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,706] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=386, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,729] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=388, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14257) Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response
[ https://issues.apache.org/jira/browse/KAFKA-14257 ] çağla boynueğri deleted comment on KAFKA-14257: - was (Author: JIRAUSER300861): Hi, I am getting the same error, can you remember how did you solve the issue ? > Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response > - > > Key: KAFKA-14257 > URL: https://issues.apache.org/jira/browse/KAFKA-14257 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.2.3 >Reporter: jianbin.chen >Priority: Major > > Please help me see why the error message is output indefinitely > broker1: > {code:java} > process.roles=broker,controller > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > node.id=1 > listeners=PLAINTEXT://192.168.6.57:9092,CONTROLLER://192.168.6.57:9093 > inter.broker.listener.name=PLAINTEXT > advertised.listeners=PLAINTEXT://192.168.6.57:9092 > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs{code} > broker2 > {code:java} > process.roles=broker,controller > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > node.id=2 > listeners=PLAINTEXT://192.168.6.56:9092,CONTROLLER://192.168.6.56:9093 > inter.broker.listener.name=PLAINTEXT > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs{code} > broker3 > {code:java} > process.roles=broker,controller > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > node.id=3 > listeners=PLAINTEXT://192.168.6.55:9092,CONTROLLER://192.168.6.55:9093 > inter.broker.listener.name=PLAINTEXT > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs > {code} > error msg: > {code:java} > [2022-09-22 18:44:01,601] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=378, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,625] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=380, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,655] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=382, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,679] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=384, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,706] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=386, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,729] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=388, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14257) Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response
[ https://issues.apache.org/jira/browse/KAFKA-14257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732014#comment-17732014 ] çağla boynueğri commented on KAFKA-14257: - Hi, I am getting the same error, can you remember how did you solve the issue ? > Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response > - > > Key: KAFKA-14257 > URL: https://issues.apache.org/jira/browse/KAFKA-14257 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.2.3 >Reporter: jianbin.chen >Priority: Major > > Please help me see why the error message is output indefinitely > broker1: > {code:java} > process.roles=broker,controller > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > node.id=1 > listeners=PLAINTEXT://192.168.6.57:9092,CONTROLLER://192.168.6.57:9093 > inter.broker.listener.name=PLAINTEXT > advertised.listeners=PLAINTEXT://192.168.6.57:9092 > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs{code} > broker2 > {code:java} > process.roles=broker,controller > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > node.id=2 > listeners=PLAINTEXT://192.168.6.56:9092,CONTROLLER://192.168.6.56:9093 > inter.broker.listener.name=PLAINTEXT > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs{code} > broker3 > {code:java} > process.roles=broker,controller > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > node.id=3 > listeners=PLAINTEXT://192.168.6.55:9092,CONTROLLER://192.168.6.55:9093 > inter.broker.listener.name=PLAINTEXT > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs > {code} > error msg: > {code:java} > [2022-09-22 18:44:01,601] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=378, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,625] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=380, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,655] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=382, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,679] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=384, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,706] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=386, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,729] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=388, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
cadonna commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1227894014 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { Review Comment: @wcarlson5 Sorry for commenting again on the segmented state store but I really want to understand since they are quite new to me. As far as I understand, you extend the `AbstractRocksDBTimeOrderedSegmentedBytesStore` which is a state store that stores records in timestamp-order into segments and it has an index on the record key. It seems to me that you do not use the segments nor the index. I understand that `AbstractRocksDBTimeOrderedSegmentedBytesStore` implements interface `SegmentedByteStore` which gives you the ability to fetch data by time ranges. Furthermore, `AbstractRocksDBTimeOrderedSegmentedBytesStore` maintains the observed stream time which is also something that comes in handy for your PR. My proposal is to hide the segmented aspect of the state store as much as possible. More precisely: - Rename `RocksDBTimeOrderedKeyValueSegmentedBytesStore` to `RocksDBTimeOrderedKeyValueBytesStore` - Do not expose `retention` and `segmentInterval` in `RocksDBTimeOrderedKeyValueBytesStore`'s constructor and in the corresponding supplier. - Set both `retention` and `segmentInterval` in the call to the super constructor to high values as @vcrfxia suggested. In such a way, you can use the features you want to use from the time ordered segmented state store and at the same hide all the aspects that you do not need behind your abstraction. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
showuon commented on PR #13828: URL: https://github.com/apache/kafka/pull/13828#issuecomment-1588961245 @divijvaidya @satishd , PR updated. Thanks. > 1. We probably want to update the KIP-405 here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs and specify that this config is optional Updated. > 2. From the javadoc of RLMM > When this is configured all other > required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener > Can we please add a test to verify this? (asking because while constructing the rlmmProps, we don't pass any other configs with the listener prefix) I think we don't have this implemented. We should pass `remote.log.metadata.*` into RLMM based on KIP-405. Created [KAFKA-15083](https://issues.apache.org/jira/browse/KAFKA-15083) for this issue. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
showuon commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1227858635 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -474,7 +474,14 @@ class BrokerServer( new KafkaConfig(config.originals(), true) // Start RemoteLogManager before broker start serving the requests. - remoteLogManager.foreach(_.startup()) + remoteLogManagerOpt.foreach(rlm => { +val listenerName = ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()) +endpoints.stream.filter(e => e.listenerName.equals(listenerName)) + .findAny() Review Comment: Agree. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM
Luke Chen created KAFKA-15083: - Summary: Passing "remote.log.metadata.*" configs into RLMM Key: KAFKA-15083 URL: https://issues.apache.org/jira/browse/KAFKA-15083 Project: Kafka Issue Type: Sub-task Reporter: Luke Chen Based on the [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM implementation creates producer and consumer instances. Common client propoerties can be configured with `remote.log.metadata.common.client.` prefix. User can also pass properties specific to {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` and `remote.log.metadata.consumer.` prefixes. These will override properties with `remote.log.metadata.common.client.` prefix.{color} {color:#00}Any other properties should be prefixed with "remote.log.metadata." and these will be passed to RemoteLogMetadataManager#configure(Map props).{color} {color:#00}For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.{color}| This is missed from current implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito
clolov commented on PR #13711: URL: https://github.com/apache/kafka/pull/13711#issuecomment-1588950243 Okay, the name has been changed and this has been rebased. I ran checkstyleTest and spotbugsTest and they are passing locally. If everything passes in the automated tests you should be able to merge it at your leisure! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito
clolov commented on PR #13712: URL: https://github.com/apache/kafka/pull/13712#issuecomment-1588898530 Thank you! I will now address the comments on the other one! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito
cadonna merged PR #13712: URL: https://github.com/apache/kafka/pull/13712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito
clolov commented on PR #13712: URL: https://github.com/apache/kafka/pull/13712#issuecomment-1588826720 Ah, sorry, yeah, I have caused the confusion. Technically the order of merging between this and https://github.com/apache/kafka/pull/13711 doesn't matter as long as one is rebased on top of the other afterwards. This appears to be ready to merge so let's go with merging this and I will then update 13711. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Use admin client to create offsets topic in tests
dajac commented on code in PR #13848: URL: https://github.com/apache/kafka/pull/13848#discussion_r1227742933 ## core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala: ## @@ -150,12 +150,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { listenerName: ListenerName = listenerName, adminClientConfig: Properties = new Properties ): Unit = { -if (isKRaftTest()) { - resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => -TestUtils.createOffsetsTopicWithAdmin(admin, brokers) - } -} else { - TestUtils.createOffsetsTopic(zkClient, servers) Review Comment: Is there a reason why we kept using the zk client here? It seems much better to use the admin client all the time unless explicitly stated otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13848: MINOR: Use admin client to create offsets topic
dajac opened a new pull request, #13848: URL: https://github.com/apache/kafka/pull/13848 I have see failures like the following ones in a few builds: ``` Build / JDK 11 and Scala 2.13 / testDescribeSimpleConsumerGroup() – kafka.admin.DescribeConsumerGroupTest org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists. ``` Many tests still use `TestUtils.createOffsetsTopic(zkClient, servers)` to create the offsets topic. This method does not handle the case where the topic exists (e.g. in the case of a retry). Instead of fixing it, I think that it is just better to use the admin client variant which is more reliable. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
dajac merged PR #13795: URL: https://github.com/apache/kafka/pull/13795 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org