[jira] [Created] (KAFKA-14704) Follower should truncate before incrementing high watermark
David Jacot created KAFKA-14704: --- Summary: Follower should truncate before incrementing high watermark Key: KAFKA-14704 URL: https://issues.apache.org/jira/browse/KAFKA-14704 Project: Kafka Issue Type: Bug Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lookingformira opened a new pull request, #13229: 3.2
lookingformira opened a new pull request, #13229: URL: https://github.com/apache/kafka/pull/13229 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14513) Add broker side PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-14513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-14513. - Resolution: Fixed > Add broker side PartitionAssignor interface > --- > > Key: KAFKA-14513 > URL: https://issues.apache.org/jira/browse/KAFKA-14513 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac merged pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac merged PR #13202: URL: https://github.com/apache/kafka/pull/13202 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] calmera commented on a diff in pull request #12742: KAFKA-10892: Shared Readonly State Stores ( revisited )
calmera commented on code in PR #12742: URL: https://github.com/apache/kafka/pull/12742#discussion_r1102355418 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -54,7 +54,7 @@ * The consumer configuration keys */ public class ConsumerConfig extends AbstractConfig { -private static final ConfigDef CONFIG; +protected static final ConfigDef CONFIG; Review Comment: Hmm looks like my reverse consumer work got committed by accident. will clean it and go through your comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1102262205 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -383,6 +389,114 @@ public void shouldDistinguishEmptyAndNull() { verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8); } +@Test +public void shouldRestore() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k", "vp20", SEGMENT_INTERVAL + 20)); +records.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10)); +records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10)); +records.add(new DataRecord("k", "vn2", SEGMENT_INTERVAL - 2)); +records.add(new DataRecord("k", "vn1", SEGMENT_INTERVAL - 1)); +records.add(new DataRecord("k", "vp1", SEGMENT_INTERVAL + 1)); + +store.restoreBatch(getChangelogRecords(records)); + +verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", SEGMENT_INTERVAL + 1); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", SEGMENT_INTERVAL - 1); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10); +} + +@Test +public void shouldRestoreWithNulls() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10)); +records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5)); +records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5)); +records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6)); + +store.restoreBatch(getChangelogRecords(records)); + +verifyGetNullFromStore("k"); +verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30); +verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5); +verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2); +verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL); +verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5); +verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6); +verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8); +} + +@Test +public void shouldRestoreWithNullsAndRepeatTimestamps() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 20)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10)); +records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 10)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10)); +records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 1)); +records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 1)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 5)); +records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5)); +records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 5)); +records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); +records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20)); +records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6)); Review Comment: Similar to N/5 -- hard to keep track without writing it down. Can we add some comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1102260418 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection> records) { -throw new UnsupportedOperationException("not yet implemented"); +// advance stream time to the max timestamp in the batch +for (final ConsumerRecord record : records) { +observedStreamTime = Math.max(observedStreamTime, record.timestamp()); +} + +final VersionedStoreClient restoreClient = restoreWriteBuffer.getClient(); Review Comment: I see -- fair enough (in general, I am a fan on strong typing whenever possible, but it's ok I guess). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1102259793 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + +private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + +// write buffer for latest value store. value type is Optional in order to track tombstones +// which must be written to the underlying store. +private final Map> latestValueWriteBuffer; +// map from segment id to write buffer. segments are stored in reverse-sorted order, +// so getReverseSegments() is more efficient +private final TreeMap segmentsWriteBuffer; +private final BatchWritingVersionedStoreClient dbClient; +private final RocksDBVersionedStoreRestoreClient restoreClient; + +/** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ +RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient dbClient) { +this.dbClient = Objects.requireNonNull(dbClient); + +this.latestValueWriteBuffer = new HashMap<>(); +// store in reverse-sorted order, to make getReverseSegments() more efficient +this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); +this.restoreClient = new RocksDBVersionedStoreRestoreClient(); +} + +/** + * @return client for writing to (and reading from) the write buffer + */ +VersionedStoreClient getClient() { +return restoreClient; +} + +/** + * Flushes the contents of the write buffer into the persistent store, and clears the write + * buffer in the process. + * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch} + */ +void flush() throws RocksDBException { + +// flush segments first, as this is consistent with the store always writing to +// older segments/stores before later ones +try (final WriteBatch segmentsBatch = new WriteBatch()) { +final List allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); +if (allSegments.size() > 0) { +// collect entries into write batch +for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { +final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); +for (final Map.Entry segmentEntry : bufferSegment.getAll()) { +dbSegment.addTo
[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
mjsax commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1102257544 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,877 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private static
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102256035 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RocksDBVersionedStoreTest { + +private static final String STORE_NAME = "myversionedrocks"; +private static final String METRICS_SCOPE = "versionedrocksdb"; +private static final long HISTORY_RETENTION = 300_000L; +private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3; +private static final long BASE_TIMESTAMP = 10L; +private static final Serializer STRING_SERIALIZER = new StringSerializer(); +private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); + +private InternalMockProcessorContext context; + +private RocksDBVersionedStore store; + +@Before +public void before() { +context = new InternalMockProcessorContext<>( +TestUtils.tempDirectory(), +Serdes.String(), +Serdes.String(), +new StreamsConfig(StreamsTestUtils.getStreamsConfig()) +); +context.setTime(BASE_TIMESTAMP); + +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, HISTORY_RETENTION, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); +} + +@After +public void after() { +store.close(); +} + +@Test +public void shouldPutLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP + 1); + +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1); +} + +@Test +public void shouldPutNullAsLatest() { +putToStore("k", null, BASE_TIMESTAMP); +putToStore("k", null, BASE_TIMESTAMP + 1); + +verifyGetNullFromStore("k"); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2); +} + +@Test +public void shouldPutOlderWithNonNullLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP - 2); +putToStore("k", "v1", BASE_TIMESTAMP - 1); +putToStore("k", "v4", BASE_TIMESTAMP - 4); + +verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4); +} + +@Test +public void shouldPutOlderWithNullLatest() { +putToStore("k", null, BASE_TIM
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102252265 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RocksDBVersionedStoreTest { + +private static final String STORE_NAME = "myversionedrocks"; +private static final String METRICS_SCOPE = "versionedrocksdb"; +private static final long HISTORY_RETENTION = 300_000L; +private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3; +private static final long BASE_TIMESTAMP = 10L; +private static final Serializer STRING_SERIALIZER = new StringSerializer(); +private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); + +private InternalMockProcessorContext context; + +private RocksDBVersionedStore store; + +@Before +public void before() { +context = new InternalMockProcessorContext<>( +TestUtils.tempDirectory(), +Serdes.String(), +Serdes.String(), +new StreamsConfig(StreamsTestUtils.getStreamsConfig()) +); +context.setTime(BASE_TIMESTAMP); + +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, HISTORY_RETENTION, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); +} + +@After +public void after() { +store.close(); +} + +@Test +public void shouldPutLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP + 1); + +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1); +} + +@Test +public void shouldPutNullAsLatest() { +putToStore("k", null, BASE_TIMESTAMP); +putToStore("k", null, BASE_TIMESTAMP + 1); + +verifyGetNullFromStore("k"); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2); +} + +@Test +public void shouldPutOlderWithNonNullLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP - 2); +putToStore("k", "v1", BASE_TIMESTAMP - 1); +putToStore("k", "v4", BASE_TIMESTAMP - 4); + +verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4); +} + +@Test +public void shouldPutOlderWithNullLatest() { +putToStore("k", null, BASE_TIM
[GitHub] [kafka] ouyangnengda commented on pull request #13220: Kafka 14253
ouyangnengda commented on PR #13220: URL: https://github.com/apache/kafka/pull/13220#issuecomment-1425139081 Hi @divijvaidya , could you please review this PR. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102248187 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RocksDBVersionedStoreTest { + +private static final String STORE_NAME = "myversionedrocks"; +private static final String METRICS_SCOPE = "versionedrocksdb"; +private static final long HISTORY_RETENTION = 300_000L; +private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3; +private static final long BASE_TIMESTAMP = 10L; +private static final Serializer STRING_SERIALIZER = new StringSerializer(); +private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); + +private InternalMockProcessorContext context; + +private RocksDBVersionedStore store; + +@Before +public void before() { +context = new InternalMockProcessorContext<>( +TestUtils.tempDirectory(), +Serdes.String(), +Serdes.String(), +new StreamsConfig(StreamsTestUtils.getStreamsConfig()) +); +context.setTime(BASE_TIMESTAMP); + +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, HISTORY_RETENTION, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); +} + +@After +public void after() { +store.close(); +} + +@Test +public void shouldPutLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP + 1); + +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1); +} + +@Test +public void shouldPutNullAsLatest() { +putToStore("k", null, BASE_TIMESTAMP); +putToStore("k", null, BASE_TIMESTAMP + 1); + +verifyGetNullFromStore("k"); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2); +} + +@Test +public void shouldPutOlderWithNonNullLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP - 2); +putToStore("k", "v1", BASE_TIMESTAMP - 1); +putToStore("k", "v4", BASE_TIMESTAMP - 4); + +verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4); +} + +@Test +public void shouldPutOlderWithNullLatest() { +putToStore("k", null, BASE_TIM
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102246615 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RocksDBVersionedStoreTest { + +private static final String STORE_NAME = "myversionedrocks"; +private static final String METRICS_SCOPE = "versionedrocksdb"; +private static final long HISTORY_RETENTION = 300_000L; +private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3; +private static final long BASE_TIMESTAMP = 10L; +private static final Serializer STRING_SERIALIZER = new StringSerializer(); +private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); + +private InternalMockProcessorContext context; + +private RocksDBVersionedStore store; + +@Before +public void before() { +context = new InternalMockProcessorContext<>( +TestUtils.tempDirectory(), +Serdes.String(), +Serdes.String(), +new StreamsConfig(StreamsTestUtils.getStreamsConfig()) +); +context.setTime(BASE_TIMESTAMP); + +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, HISTORY_RETENTION, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); +} + +@After +public void after() { +store.close(); +} + +@Test +public void shouldPutLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP + 1); + +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1); +} + +@Test +public void shouldPutNullAsLatest() { +putToStore("k", null, BASE_TIMESTAMP); +putToStore("k", null, BASE_TIMESTAMP + 1); + +verifyGetNullFromStore("k"); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2); +} + +@Test +public void shouldPutOlderWithNonNullLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP - 2); +putToStore("k", "v1", BASE_TIMESTAMP - 1); +putToStore("k", "v4", BASE_TIMESTAMP - 4); + +verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4); +} + +@Test +public void shouldPutOlderWithNullLatest() { +putToStore("k", null, BASE_TIM
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102246409 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RocksDBVersionedStoreTest { + +private static final String STORE_NAME = "myversionedrocks"; +private static final String METRICS_SCOPE = "versionedrocksdb"; +private static final long HISTORY_RETENTION = 300_000L; +private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3; +private static final long BASE_TIMESTAMP = 10L; +private static final Serializer STRING_SERIALIZER = new StringSerializer(); +private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); + +private InternalMockProcessorContext context; + +private RocksDBVersionedStore store; + +@Before +public void before() { +context = new InternalMockProcessorContext<>( +TestUtils.tempDirectory(), +Serdes.String(), +Serdes.String(), +new StreamsConfig(StreamsTestUtils.getStreamsConfig()) +); +context.setTime(BASE_TIMESTAMP); + +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, HISTORY_RETENTION, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); +} + +@After +public void after() { +store.close(); +} + +@Test +public void shouldPutLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP + 1); + +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1); +} + +@Test +public void shouldPutNullAsLatest() { +putToStore("k", null, BASE_TIMESTAMP); +putToStore("k", null, BASE_TIMESTAMP + 1); + +verifyGetNullFromStore("k"); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2); +} + +@Test +public void shouldPutOlderWithNonNullLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP - 2); +putToStore("k", "v1", BASE_TIMESTAMP - 1); +putToStore("k", "v4", BASE_TIMESTAMP - 4); + +verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4); +} + +@Test +public void shouldPutOlderWithNullLatest() { +putToStore("k", null, BASE_TIM
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102245805 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RocksDBVersionedStoreTest { Review Comment: Can we add a test (maybe in a follow up PR, with retention time zero)? Having no segments and `segment_interval = 0` (?) seems to be a corner case we should also support. Btw: it seems we don't test if `delete()` does return the right "old value"; can we also add some tests for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102243061 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RocksDBVersionedStoreTest { + +private static final String STORE_NAME = "myversionedrocks"; +private static final String METRICS_SCOPE = "versionedrocksdb"; +private static final long HISTORY_RETENTION = 300_000L; +private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3; +private static final long BASE_TIMESTAMP = 10L; +private static final Serializer STRING_SERIALIZER = new StringSerializer(); +private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); + +private InternalMockProcessorContext context; + +private RocksDBVersionedStore store; + +@Before +public void before() { +context = new InternalMockProcessorContext<>( +TestUtils.tempDirectory(), +Serdes.String(), +Serdes.String(), +new StreamsConfig(StreamsTestUtils.getStreamsConfig()) +); +context.setTime(BASE_TIMESTAMP); + +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, HISTORY_RETENTION, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); +} + +@After +public void after() { +store.close(); +} + +@Test +public void shouldPutLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP + 1); + +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1); +} + +@Test +public void shouldPutNullAsLatest() { +putToStore("k", null, BASE_TIMESTAMP); Review Comment: Should we not first do `putToStore("k", "v", BASE_TIMESTAMP);` to ensure that the `put(null)` really deletes `v` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102242415 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RocksDBVersionedStoreTest { + +private static final String STORE_NAME = "myversionedrocks"; +private static final String METRICS_SCOPE = "versionedrocksdb"; +private static final long HISTORY_RETENTION = 300_000L; +private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3; +private static final long BASE_TIMESTAMP = 10L; +private static final Serializer STRING_SERIALIZER = new StringSerializer(); +private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); + +private InternalMockProcessorContext context; + +private RocksDBVersionedStore store; + +@Before +public void before() { +context = new InternalMockProcessorContext<>( +TestUtils.tempDirectory(), +Serdes.String(), +Serdes.String(), +new StreamsConfig(StreamsTestUtils.getStreamsConfig()) +); +context.setTime(BASE_TIMESTAMP); + +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, HISTORY_RETENTION, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); +} + +@After +public void after() { +store.close(); +} + +@Test +public void shouldPutLatest() { +putToStore("k", "v", BASE_TIMESTAMP); +putToStore("k", "v2", BASE_TIMESTAMP + 1); + +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1); Review Comment: That's an interesting one... User can not also query using "future timestamp" -- nothing wrong with it; just interesting "side effect" :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102238142 ## streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; + +/** + * A key-value store that stores multiple record versions per key, and supports timestamp-based + * retrieval operations to return the latest record (per key) as of a specified timestamp. + * Only one record is stored per key and timestamp, i.e., a second call to + * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first. + * + * Each store instance has an associated, fixed-duration "history retention" which specifies + * how long old record versions should be kept for. In particular, a versioned store guarantees + * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp + * bound is within history retention of the current observed stream time. (Queries with timestamp + * bound older than the specified history retention are considered invalid.) + * + * @param The key type + * @param The value type + */ +public interface VersionedKeyValueStore extends StateStore { + +/** + * Add a new record version associated with this key. + * + * @param key The key + * @param value The value, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as a delete + * @param timestamp The timestamp for this record version + * @throws NullPointerException If {@code null} is used for key. + */ +void put(K key, V value, long timestamp); + +/** + * Delete the value associated with this key from the store, at the specified timestamp + * (if there is such a value), and return the deleted value. + * + * This operation is semantically equivalent to {@link #get(Object, long)} #get(key, timestamp))} + * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}. + * + * @param key The key + * @param timestamp The timestamp for this delete + * @return The value and timestamp of the latest record associated with this key + * as of the deletion timestamp (inclusive), or {@code null} if any of + * (1) the store contains no records for this key, (2) the latest record + * for this key as of the deletion timestamp is a tombstone, or + * (3) the deletion timestamp is older than this store's history retention + * (i.e., this store no longer contains data for the provided timestamp). + * @throws NullPointerException If {@code null} is used for key. + */ +VersionedRecord delete(K key, long timestamp); + +/** + * Get the latest (by timestamp) record associated with this key. + * + * @param key The key to fetch + * @return The value and timestamp of the latest record associated with this key, or + * {@code null} if either (1) the store contains no records for this key or (2) the + * latest record for this key is a tombstone. + * @throws NullPointerException If null is used for key. + * @throws InvalidStateStoreException if the store is not initialized + */ +VersionedRecord get(K key); + +/** + * Get the latest record associated with this key with timestamp not exceeding the specified + * timestamp bound. Review Comment: Fair point. > particularly because during KIP discussion one person mentioned that these terms did not feel intuitive to them I cannot remember this; personally I always found it very intuitive (also because it's use in temporal tables in SQL), but of course this could just be me. I agree that we should not use it only here, but rather introduce it "globally" or not use it at all. It's "just" JavaDocs and just easy to change in follow up releases if thy cause confusion, so I don't feel strong about
[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
mjsax commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1102235180 ## streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; + +/** + * A key-value store that stores multiple record versions per key, and supports timestamp-based + * retrieval operations to return the latest record (per key) as of a specified timestamp. + * Only one record is stored per key and timestamp, i.e., a second call to + * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first. + * + * Each store instance has an associated, fixed-duration "history retention" which specifies + * how long old record versions should be kept for. In particular, a versioned store guarantees + * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp + * bound is within history retention of the current observed stream time. (Queries with timestamp + * bound older than the specified history retention are considered invalid.) + * + * @param The key type + * @param The value type + */ +public interface VersionedKeyValueStore extends StateStore { + +/** + * Add a new record version associated with this key. + * + * @param key The key + * @param value The value, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as a delete Review Comment: This PR was controversial... And I am still not a fan of it -> https://github.com/apache/kafka/pull/7679#issuecomment-581695959 It potentially breaks stuff -- of course, if users follow the `null<->null` and `non-null<->non-null` pattern the PR does not do any harm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13228: [DRAFT] KAFKA-10199: Add task updater metrics, part 1
guozhangwang commented on PR #13228: URL: https://github.com/apache/kafka/pull/13228#issuecomment-1425085767 cc @lucasbru , would update after #13025 is done, and the pausing logic is extracted out of the `checkAllUpdatingTaskStates` function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request, #13228: [DRAFT] KAFKA-10199: Add task updater metrics, part 1
guozhangwang opened a new pull request, #13228: URL: https://github.com/apache/kafka/pull/13228 Should only be reviewed after #13025 1. Added thread-level restoration metrics. 2. Related unit tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #13227: KAFKA-14693; Kafka node should halt instead of exit
jsancio opened a new pull request, #13227: URL: https://github.com/apache/kafka/pull/13227 Extend the implementation of ProcessTerminatingFaultHandler to support calling either Exit.halt or Exit.exit. Change the fault handler used by the Controller thread and the KRaft thread to use a halting fault handler. Those two threads cannot call Exit.exit because Runtime.exit joins on the default shutdown hook thread. The shutdown hook thread joins on the controller and kraft thread terminating. This causes a deadlock. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1425058794 > Out of an abundance of caution, what do you think about targeting your mm2-negative-offsets branch with a new PR to address [KAFKA-13659](https://issues.apache.org/jira/browse/KAFKA-13659), which can be reviewed separately but then merged to trunk in tandem with this PR? I tried to do this, but it appears that the "stacked PRs" feature for github only applies for branches on the same repository, not across forks. If i were to open a stacked PR, it would be on my fork, and review comments may be lost if I accidentally delete the repository. In the interest of an atomic merge, and preserving comment history, I've added the commits here. To make it easier on your review, I will continue to tag the relevant commits with KAFKA-13659, so you can filter during your review. I hope this is acceptable, but sorry in advance for the difficult review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14565) Interceptor Resource Leak
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: External issue URL: (was: https://github.com/apache/kafka/pull/13168) > Interceptor Resource Leak > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > *AbstractConfig.getConfiguredInstances()* is delegated responsibility for > both creating and configuring each interceptor listed in the > interceptor.classes property and returns a configured > *List>* interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > two approaches: > +*PROPOSAL 1*+ > Define a default *open* or *configureWithResources()* or *acquireResources()* > method with no implementation and check exception on the respective > Consumer/Producer interceptor interfaces. This method as a part the > interceptor life cycle management will be responsible for creating threads > and/or objects which utilizes threads, connections or other resource which > requires clean up. Additionally, this default method enables implementation > optionality as it's empty default behavior means it will do nothing when > unimplemented mitigating backwards compatibility impact to exiting > interceptors. Finally, the Kafka Consumer/Producer Interceptor containers > will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* > or *maybeAcquireResources* method which also throws a checked exception. > See below code excerpt for the Consumer/Producer constructor: > {code:java} > List> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > this.interceptors = new ConsumerInterceptors<>(interceptorList); > this.interceptors.maybeConfigureWithResources(); > {code} > +*PROPOSAL 2*+ > To avoid changing any public interfaces and the subsequent KIP process, we can > * Create a class which inherits or wraps AbstractConfig that contains a new > method which will return a ConfiguredInstanceResult class. This > ConfiguredInstanceResult class will contain an optional list of successfully > created interceptors and/or exception which occurred while calling each > Interceptor::configure. Additionally, it will contain a helper method to > rethrow an exception as well as a method which returns the underlying > exception. The caller is expected to handle the exception and perform clean > up e.g. call Interceptor::close on each interceptor in the list provided by > the ConfiguredInstanceResult class. > * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} > instances if/when a failure occurs > * Add a new overloaded {{getConfiguredInstance}} / > {{getConfiguredInstances}} variant that allows users to specify whether > already-instantiated classes should be closed or not when a failure occurs > * Add a new exception type to the public API that includes a list of all of > the successfully-instantiated (and/or successfully-configured) instances > before the error was encountered so that callers can choose how to handle the > failure however they want (and possibly so that instantiation/configuration > can be attempted on every class before throwing the exception) > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14565) Interceptor Resource Leak
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: External issue URL: https://github.com/apache/kafka/pull/13168 > Interceptor Resource Leak > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > *AbstractConfig.getConfiguredInstances()* is delegated responsibility for > both creating and configuring each interceptor listed in the > interceptor.classes property and returns a configured > *List>* interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > two approaches: > +*PROPOSAL 1*+ > Define a default *open* or *configureWithResources()* or *acquireResources()* > method with no implementation and check exception on the respective > Consumer/Producer interceptor interfaces. This method as a part the > interceptor life cycle management will be responsible for creating threads > and/or objects which utilizes threads, connections or other resource which > requires clean up. Additionally, this default method enables implementation > optionality as it's empty default behavior means it will do nothing when > unimplemented mitigating backwards compatibility impact to exiting > interceptors. Finally, the Kafka Consumer/Producer Interceptor containers > will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* > or *maybeAcquireResources* method which also throws a checked exception. > See below code excerpt for the Consumer/Producer constructor: > {code:java} > List> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > this.interceptors = new ConsumerInterceptors<>(interceptorList); > this.interceptors.maybeConfigureWithResources(); > {code} > +*PROPOSAL 2*+ > To avoid changing any public interfaces and the subsequent KIP process, we can > * Create a class which inherits or wraps AbstractConfig that contains a new > method which will return a ConfiguredInstanceResult class. This > ConfiguredInstanceResult class will contain an optional list of successfully > created interceptors and/or exception which occurred while calling each > Interceptor::configure. Additionally, it will contain a helper method to > rethrow an exception as well as a method which returns the underlying > exception. The caller is expected to handle the exception and perform clean > up e.g. call Interceptor::close on each interceptor in the list provided by > the ConfiguredInstanceResult class. > * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} > instances if/when a failure occurs > * Add a new overloaded {{getConfiguredInstance}} / > {{getConfiguredInstances}} variant that allows users to specify whether > already-instantiated classes should be closed or not when a failure occurs > * Add a new exception type to the public API that includes a list of all of > the successfully-instantiated (and/or successfully-configured) instances > before the error was encountered so that callers can choose how to handle the > failure however they want (and possibly so that instantiation/configuration > can be attempted on every class before throwing the exception) > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14703) Don't resign when failing to replay uncommitted records
[ https://issues.apache.org/jira/browse/KAFKA-14703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14703: --- Description: h1. Problem The KRaft controller is replays both committed and uncommitted records. Committed records are replayed by the inactive controller. Uncommitted records are replayed by the active controller. When handling an RPC the active controller generates a response and a list of uncommitted records. The active controller replays the uncommitted records before sending them to the KRaft layer for durability and replication. If the active controller encounters an error when replaying the uncommitted records, it calls the process exit fault handler. Indirectly, the process exit fault handler resigns its KRaft leadership and closes all of the client connections. Most clients to retry the RPC when they disconnect from the remote endpoint. If the RPC's replay error is deterministic then it is possible for the failure to propagate to all of the controllers as they become leaders. This handling may cause the controllers to become unavailable. h1. Solution We can avoid this failure from propagating to all of the controllers by changing how we handle errors when replaying uncommitted records. The active controller doesn't need to fatally exit, if it failed to replay an uncommitted record. The active controller should instead failed the RPC with an UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot before the RPC was handled. h1. Drawback This solution doesn't work if the error is in the Timeline data structures themselves and the controller is unable to SnapshotRegistry::revertToSnapshot to the previous state. was: h1. Problem The KRaft controller is replays both committed and uncommitted records. Committed records are replayed by the inactive controller. Uncommitted records are replayed by the active controller. When handling an RPC the active controller generates a response and a list of uncommitted records. The active controller replays the uncommitted records before sending them to the KRaft layer for durability and replication. If the active controller encounters an error when replaying the uncommitted records, it calls the process exit fault handler. Indirectly, the process exit fault handler resigns its KRaft leadership and closes all of the client connections. Most clients to retry the RPC when they disconnect from the remote endpoint. If the RPC's replay error is deterministic then it is possible for the failure to propagate to all of the controllers as they become leaders. This handling may cause the controllers to become unavailable. h1. Solution We can avoid this failure from propagating to all of the controllers by changing how we handle errors when replaying uncommitted records. The active controller doesn't need to fatally exit, if it failed to replay an uncommitted record. The active controller should instead failed the RPC with an UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot before the RPC was handled. > Don't resign when failing to replay uncommitted records > --- > > Key: KAFKA-14703 > URL: https://issues.apache.org/jira/browse/KAFKA-14703 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: José Armando García Sancio >Priority: Major > > h1. Problem > The KRaft controller is replays both committed and uncommitted records. > Committed records are replayed by the inactive controller. Uncommitted > records are replayed by the active controller. > When handling an RPC the active controller generates a response and a list of > uncommitted records. The active controller replays the uncommitted records > before sending them to the KRaft layer for durability and replication. If the > active controller encounters an error when replaying the uncommitted records, > it calls the process exit fault handler. > Indirectly, the process exit fault handler resigns its KRaft leadership and > closes all of the client connections. > Most clients to retry the RPC when they disconnect from the remote endpoint. > If the RPC's replay error is deterministic then it is possible for the > failure to propagate to all of the controllers as they become leaders. This > handling may cause the controllers to become unavailable. > h1. Solution > We can avoid this failure from propagating to all of the controllers by > changing how we handle errors when replaying uncommitted records. The active > controller doesn't need to fatally exit, if it failed to replay an > uncommitted record. The active controller should instead failed the RPC with > an UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot > before t
[GitHub] [kafka] elarib closed pull request #5405: KAFKA-7189 : Add a Merge Transformer for Kafka Connect
elarib closed pull request #5405: KAFKA-7189 : Add a Merge Transformer for Kafka Connect URL: https://github.com/apache/kafka/pull/5405 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
jolshan commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1424973150 One concern is that the ConcurrentHashMap could take up more space. This metric was introduced to try to get this memory usage under control. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14703) Don't resign when failing to replay uncommitted records
[ https://issues.apache.org/jira/browse/KAFKA-14703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686798#comment-17686798 ] Colin McCabe commented on KAFKA-14703: -- Sorry, but I don’t think this will work. We don’t know what state the in-memory data structures are in. Exiting is the only safe course of action. It is unfortunate that we need to exit, but every bug is unfortunate. Controllers restart quickly so it should not be a big problem in practice. > Don't resign when failing to replay uncommitted records > --- > > Key: KAFKA-14703 > URL: https://issues.apache.org/jira/browse/KAFKA-14703 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: José Armando García Sancio >Priority: Major > > h1. Problem > The KRaft controller is replays both committed and uncommitted records. > Committed records are replayed by the inactive controller. Uncommitted > records are replayed by the active controller. > When handling an RPC the active controller generates a response and a list of > uncommitted records. The active controller replays the uncommitted records > before sending them to the KRaft layer for durability and replication. If the > active controller encounters an error when replaying the uncommitted records, > it calls the process exit fault handler. > Indirectly, the process exit fault handler resigns its KRaft leadership and > closes all of the client connections. > Most clients to retry the RPC when they disconnect from the remote endpoint. > If the RPC's replay error is deterministic then it is possible for the > failure to propagate to all of the controllers as they become leaders. This > handling may cause the controllers to become unavailable. > h1. Solution > We can avoid this failure from propagating to all of the controllers by > changing how we handle errors when replaying uncommitted records. The active > controller doesn't need to fatally exit, if it failed to replay an > uncommitted record. The active controller should instead failed the RPC with > an UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot > before the RPC was handled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14546) Allow Partitioner to return -1 to indicate default partitioning
[ https://issues.apache.org/jira/browse/KAFKA-14546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686780#comment-17686780 ] James Olsen commented on KAFKA-14546: - [~viktorsomogyi] Is this on anyones' radar? This will become a show-stopper if DefaultPartitioner is eventually removed. > Allow Partitioner to return -1 to indicate default partitioning > --- > > Key: KAFKA-14546 > URL: https://issues.apache.org/jira/browse/KAFKA-14546 > Project: Kafka > Issue Type: Improvement > Components: producer >Affects Versions: 3.3.1 >Reporter: James Olsen >Priority: Major > > Prior to KIP-794 it was possible to create a custom Partitioner that could > delegate to the DefaultPartitioner. DefaultPartitioner has been deprecated > so we can now only delegate to BuiltInPartitioner.partitionForKey which does > not handle a non-keyed message. Hence there is now no mechanism for a custom > Partitioner to fallback to default partitioning, e.g. for the non-keyed > sticky case. > I would like to propose that KafkaProducer.partition(...) not throw > IllegalArgumentException if the Partitioner returns > RecordMetadata.UNKNOWN_PARTITION and instead continue with the default > behaviour. Maybe with a configuration flag to enable this behaviour so as > not to break existing expectations? > Why was Partitioner delegation with default fallback useful? > # A single Producer can be used to write to multiple Topics where each Topic > may have different partitioning requirements. The Producer can only have a > single Partitioner so the Partitioner needs to be able to switch behaviour > based on the Topic, including the need to fallback to default behaviour if a > given Topic does not have a custom requirement. > # Multiple services may need to produce to the same Topic and these services > may be authored by different teams. A single custom Partitioner that > encapsulates all Topic specific partitioning logic can be used by all teams > at all times for all Topics ensuring that mistakes are not made. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14703) Don't resign when failing to replay uncommitted records
José Armando García Sancio created KAFKA-14703: -- Summary: Don't resign when failing to replay uncommitted records Key: KAFKA-14703 URL: https://issues.apache.org/jira/browse/KAFKA-14703 Project: Kafka Issue Type: Improvement Components: controller Reporter: José Armando García Sancio h1. Problem The KRaft controller is replays both committed and uncommitted records. Committed records are replayed by the inactive controller. Uncommitted records are replayed by the active controller. When handling an RPC the active controller generates a response and a list of uncommitted records. The active controller replays the uncommitted records before sending them to the KRaft layer for durability and replication. If the active controller encounters an error when replaying the uncommitted records, it calls the process exit fault handler. Indirectly, the process exit fault handler resigns its KRaft leadership and closes all of the client connections. Most clients to retry the RPC when they disconnect from the remote endpoint. If the RPC's replay error is deterministic then it is possible for the failure to propagate to all of the controllers as they become leaders. This handling may cause the controllers to become unavailable. h1. Solution We can avoid this failure from propagating to all of the controllers by changing how we handle errors when replaying uncommitted records. The active controller doesn't need to fatally exit, if it failed to replay an uncommitted record. The active controller should instead failed the RPC with an UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot before the RPC was handled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector
C0urante commented on code in PR #12366: URL: https://github.com/apache/kafka/pull/12366#discussion_r1101986943 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -188,11 +196,52 @@ public ConfigDef config() { return MirrorSourceConfig.CONNECTOR_CONFIG_DEF; } +@Override +public org.apache.kafka.common.config.Config validate(Map props) { +List configValues = MirrorSourceConfig.CONNECTOR_CONFIG_DEF.validate(props); +if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) { +if (!consumerUsesReadCommitted(props)) { +ConfigValue exactlyOnceSupport = configValues.stream() +.filter(cv -> EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name())) +.findAny() +.orElseGet(() -> { +ConfigValue result = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG); +configValues.add(result); +return result; +}); +// The Connect framework will already generate an error for this property if we return ExactlyOnceSupport.UNSUPPORTED +// from our exactlyOnceSupport method, but it will be fairly generic +// We add a second error message here to give users more insight into why this specific connector can't support exactly-once +// guarantees with the given configuration +exactlyOnceSupport.addErrorMessage( +"MirrorSourceConnector can only provide exactly-once guarantees when its source consumer is configured with " ++ ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to '" + READ_COMMITTED + "'; " ++ "otherwise, records from aborted and uncommitted transactions will be replicated from the " ++ "source cluster to the target cluster." +); +} +} +return new org.apache.kafka.common.config.Config(configValues); +} + @Override public String version() { return AppInfoParser.getVersion(); } +@Override +public ExactlyOnceSupport exactlyOnceSupport(Map props) { +return consumerUsesReadCommitted(props) +? ExactlyOnceSupport.SUPPORTED +: ExactlyOnceSupport.UNSUPPORTED; +} + +private boolean consumerUsesReadCommitted(Map props) { +MirrorSourceConfig config = new MirrorSourceConfig(props); Review Comment: This turned out to be much trickier to address than expected, but I've made this method impervious to unrelated invalid properties. This required being able to determine the full config used for the source consumer without constructing a `MirrorSourceConfig` instance, which in turn required (or at least, was facilitated by) pulling out the find-all-entries-with-prefix logic in `AbstractConfig::originalsWithPrefix` into a static method in `Utils`. I couldn't think of a less invasive way to accomplish this, but it does seem worth the tradeoff; LMKWYT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector
C0urante commented on code in PR #12366: URL: https://github.com/apache/kafka/pull/12366#discussion_r1101982892 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -188,11 +196,52 @@ public ConfigDef config() { return MirrorSourceConfig.CONNECTOR_CONFIG_DEF; } +@Override +public org.apache.kafka.common.config.Config validate(Map props) { +List configValues = MirrorSourceConfig.CONNECTOR_CONFIG_DEF.validate(props); Review Comment: Fine by me, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] littlehorse-eng commented on a diff in pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.
littlehorse-eng commented on code in PR #13082: URL: https://github.com/apache/kafka/pull/13082#discussion_r1101981008 ## docs/streams/developer-guide/config-streams.html: ## @@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas - The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping - the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker - traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time - for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + +The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping +the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker +traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time +for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + + +Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup replica can only be promoted to an active Task during +a rebalance (normally a Probing Rebalance, which occur at a frequency specified by the +probing.rebalance.interval.ms config). This means that the +maximum rate at which Stream Tasks can be migrated from over-burdened Streams Instances to fresher Streams Instances can be determined by +(max.warmup.replicas / +probing.rebalance.interval.ms). If it takes longer than the probing rebalance interval +for the data for a Task to be migrated, then that rate will be lower. Review Comment: Thanks for the review! What I meant here is that, if it takes a long time (eg. 60 minutes) for a Task to catch up, then decreasing the probing rebalance interval from, let's say, 30 minutes to 10 minutes won't have any effect on the speed at which you catch up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.
mjsax commented on code in PR #13082: URL: https://github.com/apache/kafka/pull/13082#discussion_r1101930903 ## docs/streams/developer-guide/config-streams.html: ## @@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas - The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping - the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker - traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time - for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + +The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping Review Comment: ```suggestion The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping ``` ## docs/streams/developer-guide/config-streams.html: ## @@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas - The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping - the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker - traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time - for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + +The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping +the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker +traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time +for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + + +Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup replica can only be promoted to an active Task during Review Comment: ```suggestion Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup task can only be promoted to an active task during ``` ## docs/streams/developer-guide/config-streams.html: ## @@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas - The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping - the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker - traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time - for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + +The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping +the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker +traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time +for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + + +Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup replica can only be promoted to an active Task during +a rebalance (normally a Probing Rebalance, which occur at a frequency specified by the Review Comment: ```suggestion a rebalance (normally a co-called probing rebalance, w
[GitHub] [kafka] cmccabe opened a new pull request, #13226: MINOR: Remove references to HIGHEST_SUPPORTED_VERSION from ZkMigrationClient
cmccabe opened a new pull request, #13226: URL: https://github.com/apache/kafka/pull/13226 Do not use HIGHEST_SUPPORTED_VERSION in ZkMigrationClient because it will do the wrong thing when more MV options are added in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module
Hangleton commented on PR #13214: URL: https://github.com/apache/kafka/pull/13214#issuecomment-1424741428 Thank you for your review Frederico (@fvaleri). I pushed the changes to address all your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module
Hangleton commented on code in PR #13214: URL: https://github.com/apache/kafka/pull/13214#discussion_r1101958375 ## tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java: ## @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Properties; +import java.util.regex.Pattern; + +import static java.util.Arrays.stream; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.utils.Utils.loadProps; +import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions; +import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs; + +/** + * Sends {@link ProducerRecord} generated from lines read on the standard input. + */ +public class ConsoleProducer { +public static void main(String[] args) { +ConsoleProducer consoleProducer = new ConsoleProducer(); +consoleProducer.start(args); +} + +void start(String[] args) { +try { +ConsoleProducerConfig config = new ConsoleProducerConfig(args); +MessageReader reader = createMessageReader(config); +reader.init(System.in, config.getReaderProps()); + +KafkaProducer producer = createKafkaProducer(config.getProducerProps()); +Exit.addShutdownHook("producer-shutdown-hook", producer::close); + +ProducerRecord record; +do { +record = reader.readMessage(); +if (record != null) { +send(producer, record, config.sync()); +} +} while (record != null); + +} catch (OptionException e) { +System.err.println(e.getMessage()); +Exit.exit(1); + +} catch (Exception e) { +e.printStackTrace(); +Exit.exit(1); +} +Exit.exit(0); +} + +// VisibleForTesting +KafkaProducer createKafkaProducer(Properties props) { +return new KafkaProducer<>(props); +} + +// VisibleForTesting +MessageReader createMessageReader(ConsoleProducerConfig config) throws ReflectiveOperationException { +return (MessageReader) Class.forName(config.reader
[GitHub] [kafka] guozhangwang commented on pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
guozhangwang commented on PR #13202: URL: https://github.com/apache/kafka/pull/13202#issuecomment-1424717333 > @guozhangwang If you don't mind, I will do it as a follow-up. Created https://issues.apache.org/jira/browse/KAFKA-14702 to track this. Yeah totally, like I said it's "independent of this PR" :) Feel free to merge this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14701) More broker side partition assignor to common
[ https://issues.apache.org/jira/browse/KAFKA-14701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-14701: Summary: More broker side partition assignor to common (was: More broker side assignor to common) > More broker side partition assignor to common > - > > Key: KAFKA-14701 > URL: https://issues.apache.org/jira/browse/KAFKA-14701 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > Before releasing KIP-848, we need to move the server side partition assignor > to its final location in common. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14701) More broker side assignor to common
[ https://issues.apache.org/jira/browse/KAFKA-14701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-14701: Description: Before releasing KIP-848, we need to move the server side partition assignor to its final location in common. > More broker side assignor to common > --- > > Key: KAFKA-14701 > URL: https://issues.apache.org/jira/browse/KAFKA-14701 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > Before releasing KIP-848, we need to move the server side partition assignor > to its final location in common. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia commented on pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
vcrfxia commented on PR #13189: URL: https://github.com/apache/kafka/pull/13189#issuecomment-1424701063 Thanks for your review, @mjsax ! Responded to your comments inline and also pushed a commit just now containing the requested changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101924823 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. Review Comment: I think that I will keep the current name for now. I am not fully convinced by `GroupCoordinatorPartitionAssignor` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
vcrfxia commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1101924540 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection> records) { -throw new UnsupportedOperationException("not yet implemented"); +// advance stream time to the max timestamp in the batch +for (final ConsumerRecord record : records) { +observedStreamTime = Math.max(observedStreamTime, record.timestamp()); +} + +final VersionedStoreClient restoreClient = restoreWriteBuffer.getClient(); Review Comment: The VersionedStoreSegment implementation used by the restore client (WriteBufferSegmentWithDbFallback) is currently private to the write buffer class, since the RocksDBVersionedStore doesn't care what the type is; all the outer class needs are the methods provided by the RocksDBVersionedStoreClient interface itself. So I've left the type out in order to avoid polluting the outer class with extra info it doesn't need. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + +private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + +// write buffer for latest value store. value type is Optional in order to track tombstones +// which must be written to the underlying store. +private final Map> latestValueWriteBuffer; +// map from segment id to write buffer. segments are stored in reverse-sorted order, +// so getReverseSegments() is more efficient +private final TreeMap segmentsWriteBuffer; +private final BatchWritingVersionedStoreClient dbClient; +private final RocksDBVersionedStoreRestoreClient restoreClient; + +/** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ +RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient dbClient) { +this.dbClient = Objects.requireNonNull(dbClient); + +this.latestValueWriteBuffer = new HashMap<>(); +// store in reverse-sorted order, to make getReverseSegments() more efficient +this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); +this.restoreClient = new RocksDBVersionedStoreRestoreClient(); +} + +/** + * @return client for writing
[GitHub] [kafka] dajac commented on pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac commented on PR #13202: URL: https://github.com/apache/kafka/pull/13202#issuecomment-1424699307 @guozhangwang If you don't mind, I will do it as a follow-up. Created https://issues.apache.org/jira/browse/KAFKA-14702 to track this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14701) More broker side assignor to common
David Jacot created KAFKA-14701: --- Summary: More broker side assignor to common Key: KAFKA-14701 URL: https://issues.apache.org/jira/browse/KAFKA-14701 Project: Kafka Issue Type: Sub-task Reporter: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14702) Extend server side assignor to support rack aware replica placement
David Jacot created KAFKA-14702: --- Summary: Extend server side assignor to support rack aware replica placement Key: KAFKA-14702 URL: https://issues.apache.org/jira/browse/KAFKA-14702 Project: Kafka Issue Type: Sub-task Reporter: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac commented on PR #13202: URL: https://github.com/apache/kafka/pull/13202#issuecomment-1424692244 @guozhangwang That's a good point. We need to add the list of replica with their rack id to `AssignmentTopicMetadata`. That would replace the `numPartitions`. We missed this part in KIP-848 but we already added `rackId` to `AssignmentMemberSpec`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14700) Produce request interceptors
[ https://issues.apache.org/jira/browse/KAFKA-14700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mariassy updated KAFKA-14700: --- Description: See [KIP-905|https://cwiki.apache.org/confluence/display/KAFKA/KIP-905%3A+Broker+interceptors] (was: See KIP-905) > Produce request interceptors > > > Key: KAFKA-14700 > URL: https://issues.apache.org/jira/browse/KAFKA-14700 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: David Mariassy >Assignee: David Mariassy >Priority: Major > > See > [KIP-905|https://cwiki.apache.org/confluence/display/KAFKA/KIP-905%3A+Broker+interceptors] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
vcrfxia commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1101915894 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This + * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may + * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during + * restoration. + * + * The structure of the internals of this write buffer mirrors the structure of the + * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the + * segment stores is buffered in a separate object -- specifically, a map. + */ +public class RocksDBVersionedStoreRestoreWriteBuffer { + +private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class); + +// write buffer for latest value store. value type is Optional in order to track tombstones +// which must be written to the underlying store. +private final Map> latestValueWriteBuffer; +// map from segment id to write buffer. segments are stored in reverse-sorted order, +// so getReverseSegments() is more efficient +private final TreeMap segmentsWriteBuffer; +private final BatchWritingVersionedStoreClient dbClient; +private final RocksDBVersionedStoreRestoreClient restoreClient; + +/** + * Creates a new write buffer. + * @param dbClient client for reading from and writing to the underlying persistent store + */ +RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient dbClient) { +this.dbClient = Objects.requireNonNull(dbClient); + +this.latestValueWriteBuffer = new HashMap<>(); +// store in reverse-sorted order, to make getReverseSegments() more efficient +this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x)); +this.restoreClient = new RocksDBVersionedStoreRestoreClient(); +} + +/** + * @return client for writing to (and reading from) the write buffer + */ +VersionedStoreClient getClient() { +return restoreClient; +} + +/** + * Flushes the contents of the write buffer into the persistent store, and clears the write + * buffer in the process. + * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch} + */ +void flush() throws RocksDBException { + +// flush segments first, as this is consistent with the store always writing to +// older segments/stores before later ones +try (final WriteBatch segmentsBatch = new WriteBatch()) { +final List allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); +if (allSegments.size() > 0) { +// collect entries into write batch +for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { +final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); +for (final Map.Entry segmentEntry : bufferSegment.getAll()) { +dbSegment.add
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
vcrfxia commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1101914047 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -333,10 +384,34 @@ interface VersionedStoreSegment { long segmentIdForTimestamp(long timestamp); } +/** + * A {@link VersionedStoreClient} which additionally supports batch writes into its latest + * value store. + * + * @param the segment type used by this client + */ +interface BatchWritingVersionedStoreClient extends VersionedStoreClient { Review Comment: We could, yeah. It just means we'd need to make RocksDBVersionedStoreClient non-private and require that RocksDBVersionedStoreRestoreWriteBuffer is specifically passed an instance of RocksDBVersionedStoreClient rather than any BatchWritingVersionedStoreClient. I'll make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog
vcrfxia commented on code in PR #13189: URL: https://github.com/apache/kafka/pull/13189#discussion_r1101910583 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,877 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private stat
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
vcrfxia commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1101902156 ## streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; + +/** + * A key-value store that stores multiple record versions per key, and supports timestamp-based + * retrieval operations to return the latest record (per key) as of a specified timestamp. + * Only one record is stored per key and timestamp, i.e., a second call to + * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first. + * + * Each store instance has an associated, fixed-duration "history retention" which specifies + * how long old record versions should be kept for. In particular, a versioned store guarantees + * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp + * bound is within history retention of the current observed stream time. (Queries with timestamp + * bound older than the specified history retention are considered invalid.) + * + * @param The key type + * @param The value type + */ +public interface VersionedKeyValueStore extends StateStore { + +/** + * Add a new record version associated with this key. + * + * @param key The key + * @param value The value, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as a delete + * @param timestamp The timestamp for this record version + * @throws NullPointerException If {@code null} is used for key. + */ +void put(K key, V value, long timestamp); + +/** + * Delete the value associated with this key from the store, at the specified timestamp + * (if there is such a value), and return the deleted value. + * + * This operation is semantically equivalent to {@link #get(Object, long)} #get(key, timestamp))} + * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}. + * + * @param key The key + * @param timestamp The timestamp for this delete + * @return The value and timestamp of the latest record associated with this key + * as of the deletion timestamp (inclusive), or {@code null} if any of + * (1) the store contains no records for this key, (2) the latest record + * for this key as of the deletion timestamp is a tombstone, or + * (3) the deletion timestamp is older than this store's history retention + * (i.e., this store no longer contains data for the provided timestamp). + * @throws NullPointerException If {@code null} is used for key. + */ +VersionedRecord delete(K key, long timestamp); + +/** + * Get the latest (by timestamp) record associated with this key. + * + * @param key The key to fetch + * @return The value and timestamp of the latest record associated with this key, or + * {@code null} if either (1) the store contains no records for this key or (2) the + * latest record for this key is a tombstone. + * @throws NullPointerException If null is used for key. + * @throws InvalidStateStoreException if the store is not initialized + */ +VersionedRecord get(K key); + +/** + * Get the latest record associated with this key with timestamp not exceeding the specified + * timestamp bound. Review Comment: Hm. Currently "validFrom" and "validTo" are not mentioned anywhere in the public-facing interfaces (VersionedKeyValueStore, VersionedRecord, etc) in javadocs or as parameter names; they are only "officially" introduced as concepts in the javadocs for RocksDBVersionedStore and its implementation helpers. I'm a bit hesitant to introduce these concepts in the public-facing docs at this time if the only purpose is to clarify what's returned from `get(key, asOfTimestamp)`, particularly because during KIP discussion one p
[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
jolshan commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101900196 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. Review Comment: Yeah I was just thinking about that, but didn't know if the name would be awkward. Thinking about it though, it might be better to clarify through naming to save headaches in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
jolshan commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101899467 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface PartitionAssignor { + Review Comment: Seems reasonable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
vcrfxia commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1101897601 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A persistent, versioned key-value store based on RocksDB. + * + * This store implementation consists of a "latest value store" and "segment stores." The latest + * record version for each key is stored in the latest value store, while older record versions + * are stored in the segment stores. Conceptually, each record version has two associated + * timestamps: + * + * a {@code validFrom} timestamp. This timestamp is explicitly associated with the record + * as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store; + * i.e., this is the record's timestamp. + * a {@code validTo} timestamp. This is the timestamp of the next record (or deletion) + * associated with the same key, and is implicitly associated with the record. This timestamp + * can change as new records are inserted into the store. + * + * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and + * can change as new record versions are inserted into the store (and validTo changes as a result). + * + * Old record versions are stored in segment stores according to their validTo timestamps. This + * allows for efficient expiry of old record versions, as entire segments can be dropped from the + * store at a time, once the records contained in the segment are no longer relevant based on the + * store's history retention (for an explanation of "history retention", see + * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single + * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}. + */ +public class RocksDBVersionedStore implements VersionedKeyValueStore { +private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class); +// a marker to indicate that no record version has yet been found as part of an ongoing +// put() procedure. any value which is not a valid record timestamp will do. +private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE; + +private final String name; +private final long historyRetention; +private final RocksDBMetricsRecorder metricsRecorder; + +private final RocksDBStore latestValueStore; +private final LogicalKeyValueSegments segmentStores; +private final LatestValueSchema latestValueSchema; +private final SegmentValueSchema segmentValueSchema; +private final
[GitHub] [kafka] lmr3796 commented on pull request #13187: MINOR: Log lastCaughtUpTime on ISR shrinkage
lmr3796 commented on PR #13187: URL: https://github.com/apache/kafka/pull/13187#issuecomment-1424662401 @clolov for visibility -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101889893 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. Review Comment: That’s right. We could perhaps name this one GroupCoordinatorPartitionAssignor to make the difference clearer. Having the same name seems confusing. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101888535 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface PartitionAssignor { + Review Comment: Yeah. I started like this but I found it too heavy. What do you think? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.Map; +import java.util.Objects; + +/** + * The assignment specification for a consumer group. + */ +public class AssignmentSpec { +/** + * The members keyed by member id. + */ +final Map members; + +/** + * The topics' metadata keyed by topic id + */ +final Map topics; + +public AssignmentSpec( +Map members, +Map topics +) { +Objects.requireNonNull(members); +Objects.requireNonNull(topics); + Review Comment: Will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module
Hangleton commented on code in PR #13214: URL: https://github.com/apache/kafka/pull/13214#discussion_r1101887714 ## tools/src/main/java/org/apache/kafka/tools/MessageReader.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.io.InputStream; +import java.util.Properties; + +/** + * Typical implementations of this interface convert data from an {@link InputStream} received via + * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each + * invocation of `{@link MessageReader#readMessage()}`. + * + * This is used by the {@link ConsoleProducer}. + */ +public interface MessageReader { Review Comment: Apologies, that is right. Thanks for correcting. On that note, this interface moved from `kafka.common.MessageReader` to `org.apache.kafka.tools.MessageReader`. Any foreign implementation will have to be updated accordingly. What is the convention to follow is in this case - preserve the same package to avoid breaking dependencies or change the package to match the new structure? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store
vcrfxia commented on code in PR #13188: URL: https://github.com/apache/kafka/pull/13188#discussion_r1101882173 ## streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; + +/** + * A key-value store that stores multiple record versions per key, and supports timestamp-based + * retrieval operations to return the latest record (per key) as of a specified timestamp. + * Only one record is stored per key and timestamp, i.e., a second call to + * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first. + * + * Each store instance has an associated, fixed-duration "history retention" which specifies + * how long old record versions should be kept for. In particular, a versioned store guarantees + * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp + * bound is within history retention of the current observed stream time. (Queries with timestamp + * bound older than the specified history retention are considered invalid.) + * + * @param The key type + * @param The value type + */ +public interface VersionedKeyValueStore extends StateStore { + +/** + * Add a new record version associated with this key. + * + * @param key The key + * @param value The value, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as a delete Review Comment: Hm... I found this [comment](https://github.com/apache/kafka/blob/083e11a22ca9966ed010acdd5705351ab4300b52/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L87-L89) in the code the other day, which gives the impression that it is valid to serialize a non-null value to null bytes: > // Serializing non-null values to null can be useful when working with Optional-like values > // where the Optional.empty case is serialized to null. > // See the discussion here: https://github.com/apache/kafka/pull/7679 Is this no longer true? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14696) CVE-2023-25194: Apache Kafka: Possible RCE/Denial of service attack via SASL JAAS JndiLoginModule configuration using Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-14696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686696#comment-17686696 ] Manikumar commented on KAFKA-14696: --- There are no plans to provide a patch for older versions (< 3.40) We have given some advice as part CVE announcement: https://lists.apache.org/thread/vy1c7fqcdqvq5grcqp6q5jyyb302khyz > CVE-2023-25194: Apache Kafka: Possible RCE/Denial of service attack via SASL > JAAS JndiLoginModule configuration using Kafka Connect > --- > > Key: KAFKA-14696 > URL: https://issues.apache.org/jira/browse/KAFKA-14696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.8.1, 2.8.2 >Reporter: MillieZhang >Priority: Major > Fix For: 3.4.0 > > > CVE Reference: [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-34917] > > Will Kafka 2.8.X provide a patch to fix this vulnerability? > If yes, when will the patch be provided? > > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang merged pull request #13167: KAFKA-14650: Synchronize access to tasks inside task manager
guozhangwang merged PR #13167: URL: https://github.com/apache/kafka/pull/13167 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14693) KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio reassigned KAFKA-14693: -- Assignee: José Armando García Sancio > KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown > - > > Key: KAFKA-14693 > URL: https://issues.apache.org/jira/browse/KAFKA-14693 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.4.0 >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Critical > Fix For: 3.4.1 > > > h1. Problem > When the kraft controller encounters an error that it cannot handle it calls > {{ProcessExitingFaultHandler}} which calls {{Exit.exit}} which calls > {{{}Runtime.exit{}}}. > Based on the Runtime.exit documentation: > {quote}All registered [shutdown > hooks|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang.Thread-], > if any, are started in some unspecified order and allowed to run > concurrently until they finish. Once this is done the virtual machine > [halts|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#halt-int-]. > {quote} > One of the shutdown hooks registered by Kafka is {{{}Server.shutdown(){}}}. > This shutdown hook eventually calls {{{}KafkaEventQueue.close{}}}. This last > close method joins on the controller thread. Unfortunately, the controller > thread also joined waiting for the shutdown hook thread to finish. > Here are an sample thread stacks: > {code:java} >"QuorumControllerEventHandler" #45 prio=5 os_prio=0 cpu=429352.87ms > elapsed=620807.49s allocated=38544M defined_classes=353 > tid=0x7f5aeb31f800 nid=0x80c in Object.wait() [0x7f5a658fb000] > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(java.base@17.0.5/Native > Method) > - waiting on > at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304) > - locked <0xa29241f8> (a > org.apache.kafka.common.utils.KafkaThread) > at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372) > at > java.lang.ApplicationShutdownHooks.runHooks(java.base@17.0.5/ApplicationShutdownHooks.java:107) > at > java.lang.ApplicationShutdownHooks$1.run(java.base@17.0.5/ApplicationShutdownHooks.java:46) > at java.lang.Shutdown.runHooks(java.base@17.0.5/Shutdown.java:130) > at java.lang.Shutdown.exit(java.base@17.0.5/Shutdown.java:173) > - locked <0xffe020b8> (a java.lang.Class for > java.lang.Shutdown) > at java.lang.Runtime.exit(java.base@17.0.5/Runtime.java:115) > at java.lang.System.exit(java.base@17.0.5/System.java:1860) > at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43) > at org.apache.kafka.common.utils.Exit.exit(Exit.java:66) > at org.apache.kafka.common.utils.Exit.exit(Exit.java:62) > at > org.apache.kafka.server.fault.ProcessExitingFaultHandler.handleFault(ProcessExitingFaultHandler.java:54) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:891) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:874) > at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:969){code} > and > {code:java} > "kafka-shutdown-hook" #35 prio=5 os_prio=0 cpu=43.42ms elapsed=378593.04s > allocated=4732K defined_classes=74 tid=0x7f5a7c09d800 nid=0x4f37 in > Object.wait() [0x7f5a47afd000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(java.base@17.0.5/Native Method) > - waiting on > at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304) > - locked <0xa272bcb0> (a > org.apache.kafka.common.utils.KafkaThread) > at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372) > at > org.apache.kafka.queue.KafkaEventQueue.close(KafkaEventQueue.java:509) > at > org.apache.kafka.controller.QuorumController.close(QuorumController.java:2553) > at > kafka.server.ControllerServer.shutdown(ControllerServer.scala:521) > at kafka.server.KafkaRaftServer.shutdown(KafkaRaftServer.scala:184) > at kafka.Kafka$.$anonfun$main$3(Kafka.scala:99) > at kafka.Kafka$$$Lambda$406/0x000800fb9730.apply$mcV$sp(Unknown > Source) > at kafk
[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
junrao commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1101875350 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); Review Comment: Thanks, Satish. Agreed. Since this one will be implemented as async eventually, there is probably no need to set LogContext. We can keep this as static. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
jolshan commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101869241 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface PartitionAssignor { + Review Comment: I saw that we don't define the POJOs here in your comment -- to confirm, we no longer want to do this -- and its not something we plan to change in the future? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
jolshan commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101869241 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface PartitionAssignor { + Review Comment: I saw that we don't define the POJOs here in your comment -- to confirm, we no longer want to do this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration
mumrah merged PR #13183: URL: https://github.com/apache/kafka/pull/13183 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
jolshan commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101863484 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side partition assignor used by the GroupCoordinator. Review Comment: The difference here is the packages they will be in? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
jolshan commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101861603 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupAssignment.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Map; +import java.util.Objects; + +/** + * The partition assignment for a consumer group. + */ +public class GroupAssignment { +/** + * The member assignments keyed by member id. + */ +final Map members; + +public GroupAssignment( +Map members +) { +Objects.requireNonNull(members); +this.members = members; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +GroupAssignment that = (GroupAssignment) o; + +return members.equals(that.members); Review Comment: does this line just cover the above lines? Ditto for MemberAssignment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration
mumrah commented on code in PR #13183: URL: https://github.com/apache/kafka/pull/13183#discussion_r1101861663 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -274,11 +310,6 @@ public void run() throws Exception { new EventQueue.DeadlineFunction(deadline), new PollEvent()); } - Review Comment: The base MigrationEvent will do the error logging. I was just removing some redundant overrides. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface
jolshan commented on code in PR #13202: URL: https://github.com/apache/kafka/pull/13202#discussion_r1101847493 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.Map; +import java.util.Objects; + +/** + * The assignment specification for a consumer group. + */ +public class AssignmentSpec { +/** + * The members keyed by member id. + */ +final Map members; + +/** + * The topics' metadata keyed by topic id + */ +final Map topics; + +public AssignmentSpec( +Map members, +Map topics +) { +Objects.requireNonNull(members); +Objects.requireNonNull(topics); + Review Comment: extremely small nit: any reason why we include a newline here but not on the other specs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6793) Unnecessary warning log message
[ https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686677#comment-17686677 ] Philip Nee commented on KAFKA-6793: --- We're also observing the same issue. > Unnecessary warning log message > > > Key: KAFKA-6793 > URL: https://issues.apache.org/jira/browse/KAFKA-6793 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Anna O >Assignee: Philip Nee >Priority: Minor > > When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log > started to appear: > level: WARN > logger: org.apache.kafka.clients.consumer.ConsumerConfig > message: The configuration 'admin.retries' was supplied but isn't a known > config. > The config is not explicitly supplied to the streams. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] akhileshchg commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration
akhileshchg commented on code in PR #13183: URL: https://github.com/apache/kafka/pull/13183#discussion_r1101829449 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -274,11 +310,6 @@ public void run() throws Exception { new EventQueue.DeadlineFunction(deadline), new PollEvent()); } - Review Comment: Why are we not handling exceptions in other events? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #13225: KAFKA-6793
philipnee opened a new pull request, #13225: URL: https://github.com/apache/kafka/pull/13225 Reviving this old discussion, apparently the PR was closed so I reopened another one. **Issue**: Recently we've encountered complaints about the warning being too noisy, therefore we want to reduce it to debug level. The original PR was closed in : https://issues.apache.org/jira/browse/KAFKA-6793 **Questions** Is this log useful at all? Should we just remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dmariassy opened a new pull request, #13224: KAFKA-14700: Produce request interceptors
dmariassy opened a new pull request, #13224: URL: https://github.com/apache/kafka/pull/13224 First stab at implementing produce request interceptors as defined in KIP-905 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-6793) Unnecessary warning log message
[ https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-6793: - Assignee: Philip Nee > Unnecessary warning log message > > > Key: KAFKA-6793 > URL: https://issues.apache.org/jira/browse/KAFKA-6793 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Anna O >Assignee: Philip Nee >Priority: Minor > > When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log > started to appear: > level: WARN > logger: org.apache.kafka.clients.consumer.ConsumerConfig > message: The configuration 'admin.retries' was supplied but isn't a known > config. > The config is not explicitly supplied to the streams. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14700) Produce request interceptors
David Mariassy created KAFKA-14700: -- Summary: Produce request interceptors Key: KAFKA-14700 URL: https://issues.apache.org/jira/browse/KAFKA-14700 Project: Kafka Issue Type: Improvement Components: core Reporter: David Mariassy Assignee: David Mariassy See KIP-905 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module
fvaleri commented on PR #13214: URL: https://github.com/apache/kafka/pull/13214#issuecomment-1424569047 @showuon if you have some time, this looks almost ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module
fvaleri commented on code in PR #13214: URL: https://github.com/apache/kafka/pull/13214#discussion_r1101811769 ## tools/src/main/java/org/apache/kafka/tools/MessageReader.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.io.InputStream; +import java.util.Properties; + +/** + * Typical implementations of this interface convert data from an {@link InputStream} received via + * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each + * invocation of `{@link MessageReader#readMessage()}`. + * + * This is used by the {@link ConsoleProducer}. + */ +public interface MessageReader { Review Comment: This would be a breaking change, because there is an option that allows to provide your own message reader implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module
fvaleri commented on code in PR #13214: URL: https://github.com/apache/kafka/pull/13214#discussion_r1101716594 ## server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java: ## @@ -16,17 +16,29 @@ */ package org.apache.kafka.server.util; +import joptsimple.OptionParser; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.utils.Utils; import java.io.PrintStream; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; +import static java.util.Arrays.stream; + public class ToolsUtils { +public static void validatePortOrDie(OptionParser parser, String hostPort) { Review Comment: ```suggestion public static void validatePortOrExit(OptionParser parser, String hostPort) { ``` ## tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java: ## @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Properties; +import java.util.regex.Pattern; + +import static java.util.Arrays.stream; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.utils.Utils.loadProps; +import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions; +import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs; + +/** + * Sends {@link ProducerRecord} generated from lines read on the standard input. + */ +public class ConsoleProducer { +public static void main(String[] args) { +ConsoleProducer consoleProducer = new ConsoleProducer(); +consoleProducer.start(args); +} + +void start(String[] args) { +try { +ConsoleProducerConfig config = new ConsoleProducerConfig(args); +MessageReader reader = createMessageReader(config); +reader.init(System.in, config.getReaderProps()); + +KafkaProducer producer = createKafkaProducer(config.getProducerProps()); +Exit.addShutdownHook("producer-shutdown-hook", producer::close); + +ProducerRecord record; +do { +record = reader.readMessag
[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1424539164 @mimaison This is a moderately large change in behavior and if possible, it'd be nice to get another set of eyes on it before merging. We don't need another reviewer for the PR changes (although comments are always welcome); instead, I'd just like confirmation that this change is safe to make as a bug fix. TL;DR: If an upstream consumer group is ahead of the upstream offset for the latest-emitted checkpoint, we will only sync offsets for that consumer group to the downstream cluster based on the offset pair for that checkpoint, instead of adding the delta of (upstream offset for consumer group - upstream offset in checkpoint), since there is no guarantee that that delta will be accurate in cases where the upstream topic is compacted, has transaction markers, or has some records filtered out via SMT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1101777913 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.clients.admin.ScramMechanism; + +// import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +// import java.util.function.Consumer; +import java.util.stream.Collectors; + + +/** + * Represents the SCRAM credentials in the metadata image. + * + * This class is thread-safe. + */ +public final class ScramImage { +public static final ScramImage EMPTY = new ScramImage(Collections.emptyMap()); + +private final Map> mechanisms; + +public ScramImage(Map> mechanisms) { +this.mechanisms = Collections.unmodifiableMap(mechanisms); +} + +public void write(ImageWriter writer, ImageWriterOptions options) { +for (Entry> mechanismEntry : mechanisms.entrySet()) { +for (Entry userEntry : mechanismEntry.getValue().entrySet()) { +writer.write(0, userEntry.getValue().toRecord(userEntry.getKey(), mechanismEntry.getKey())); +} +} +} + +public Map> mechanisms() { +return mechanisms; +} + +public boolean isEmpty() { +return mechanisms.isEmpty(); +} + +@Override +public int hashCode() { +return mechanisms.hashCode(); +} + +@Override +public boolean equals(Object o) { +if (o == null) return false; +if (!o.getClass().equals(ScramImage.class)) return false; +ScramImage other = (ScramImage) o; +return mechanisms.equals(other.mechanisms); +} + +@Override +public String toString() { +StringBuilder builder = new StringBuilder(); +builder.append("ScramImage("); +List sortedMechanisms = mechanisms.keySet().stream().sorted().collect(Collectors.toList()); +String preMechanismComma = ""; +for (ScramMechanism mechanism : sortedMechanisms) { +builder.append(preMechanismComma).append(mechanism).append(": {"); +Map userMap = mechanisms.get(mechanism); +List sortedUserNames = userMap.keySet().stream().sorted().collect(Collectors.toList()); +String preUserNameComma = ""; +for (String userName : sortedUserNames) { + builder.append(preUserNameComma).append(userName).append("=").append(userMap.get(userName)); Review Comment: This one should b okay. It is just printing a list of usernames and which SCRAM mechanism they use. No passwords or salts displayed here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
omkreddy commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1101769724 ## metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion; +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult; +import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM; + + +/** + * Manages SCRAM credentials. + */ +public class ScramControlManager { +static final int MAX_ITERATIONS = 16384; + +static class Builder { +private LogContext logContext = null; +private SnapshotRegistry snapshotRegistry = null; + +Builder setLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) { +this.snapshotRegistry = snapshotRegistry; +return this; +} + +ScramControlManager build() { +if (logContext == null) logContext = new LogContext(); +if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); +return new ScramControlManager(logContext, +snapshotRegistry); +} +} + +static class ScramCredentialKey { +private final String username; +private final ScramMechanism mechanism; + +ScramCredentialKey(String username, ScramMechanism mechanism) { +this.username = username; +this.mechanism = mechanism; +} + +@Override +public int hashCode() { +return Objects.hash(username, mechanism); +} + +@Override +public boolean equals(Object o) { +if (o == null) return false; +if (!(o.getClass() == this.getClass())) return false; +ScramCredentialKey other = (ScramCredentialKey) o; +return username.equals(other.username) && +mechanism.equals(other.mechanism); +} + +@Override +public String toString() { +return "ScramCredentialKey" + +"(username=" + username + +", mechanism=" + mechanism + +")"; +} +} + +static class ScramCredentialValue { +private final byte[] salt; +private final byte[] saltedPassword; +private final int iterations; + +ScramCredentialValue( +byte[] salt, +byte[] saltedPassword, +int iterations +) { +this.salt = salt; +this.saltedPassword = saltedPassword; +this.iterations = iterations; +} + +@Override +publ
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1101766963 ## metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion; +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult; +import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM; + + +/** + * Manages SCRAM credentials. + */ +public class ScramControlManager { +static final int MAX_ITERATIONS = 16384; + +static class Builder { +private LogContext logContext = null; +private SnapshotRegistry snapshotRegistry = null; + +Builder setLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) { +this.snapshotRegistry = snapshotRegistry; +return this; +} + +ScramControlManager build() { +if (logContext == null) logContext = new LogContext(); +if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); +return new ScramControlManager(logContext, +snapshotRegistry); +} +} + +static class ScramCredentialKey { +private final String username; +private final ScramMechanism mechanism; + +ScramCredentialKey(String username, ScramMechanism mechanism) { +this.username = username; +this.mechanism = mechanism; +} + +@Override +public int hashCode() { +return Objects.hash(username, mechanism); +} + +@Override +public boolean equals(Object o) { +if (o == null) return false; +if (!(o.getClass() == this.getClass())) return false; +ScramCredentialKey other = (ScramCredentialKey) o; +return username.equals(other.username) && +mechanism.equals(other.mechanism); +} + +@Override +public String toString() { +return "ScramCredentialKey" + +"(username=" + username + +", mechanism=" + mechanism + +")"; +} +} + +static class ScramCredentialValue { +private final byte[] salt; +private final byte[] saltedPassword; +private final int iterations; + +ScramCredentialValue( +byte[] salt, +byte[] saltedPassword, +int iterations +) { +this.salt = salt; +this.saltedPassword = saltedPassword; +this.iterations = iterations; +} + +@Override +p
[jira] [Commented] (KAFKA-14674) Backport fix for KAFKA-14455 to 3.3 and 3.4 branches
[ https://issues.apache.org/jira/browse/KAFKA-14674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686648#comment-17686648 ] Marcel Stör commented on KAFKA-14674: - I wasn't able to find any EOL information on the 2.x release line. Hence, I assume it _is_ EOL and this fix won't be backported to 2.8.2? > Backport fix for KAFKA-14455 to 3.3 and 3.4 branches > > > Key: KAFKA-14674 > URL: https://issues.apache.org/jira/browse/KAFKA-14674 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 3.4.1, 3.3.3 > > > The 3.4 branch is currently frozen for the upcoming 3.4.0 release. Once the > release is complete, we can and should backport the change in > [https://github.com/apache/kafka/pull/12984] onto the 3.3 and 3.4 branches > before putting out any subsequent releases on those branches. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reassigned KAFKA-14698: Assignee: Akhilesh Chaganti > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Assignee: Akhilesh Chaganti >Priority: Major > Fix For: 3.5.0, 3.4.1 > > Attachments: broker0.log, controller.log, test_online_migration.tar.gz > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] > Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client > requested connection close from node 0 > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 > disconnected. (org.apache.kafka.clients.NetworkClient) > {code} > Attached the controller logs and logs from broker-0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2
C0urante commented on PR #13137: URL: https://github.com/apache/kafka/pull/13137#issuecomment-1424428493 Whoops--there was a typo in the title at the time of merge. It should refer to [KAFKA-10586](https://issues.apache.org/jira/browse/KAFKA-10586), not KAFKA-15086. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686623#comment-17686623 ] David Arthur commented on KAFKA-14698: -- Yea, I agree. Any ERROR is pretty concerning from an operator perspective. > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.5.0, 3.4.1 > > Attachments: broker0.log, controller.log, test_online_migration.tar.gz > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] > Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client > requested connection close from node 0 > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 > disconnected. (org.apache.kafka.clients.NetworkClient) > {code} > Attached the controller logs and logs from broker-0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686618#comment-17686618 ] Mickael Maison commented on KAFKA-14698: Yeah it seems it's just noise in the end but if you don't know I don't expect anyone to go forwards with the migration when both the controllers and brokers spam errors! > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.5.0, 3.4.1 > > Attachments: broker0.log, controller.log, test_online_migration.tar.gz > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] > Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client > requested connection close from node 0 > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 > disconnected. (org.apache.kafka.clients.NetworkClient) > {code} > Attached the controller logs and logs from broker-0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante closed pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.
C0urante closed pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation. URL: https://github.com/apache/kafka/pull/8656 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.
C0urante commented on PR #8656: URL: https://github.com/apache/kafka/pull/8656#issuecomment-1424416521 Closing as this issue has been addressed by https://github.com/apache/kafka/pull/13137 / [KIP-710](https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686615#comment-17686615 ] David Arthur commented on KAFKA-14698: -- Thanks [~mimaison]. I've uploaded the logs from the zk migration system test which also shows this problem. If you continue to migrate ZK brokers, do you see other problems besides the ERROR logs? I suspect this is just a superficial problem and not a correctness issue. cc [~showuon] > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.5.0, 3.4.1 > > Attachments: broker0.log, controller.log, test_online_migration.tar.gz > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] > Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client > requested connection close from node 0 > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 > disconnected. (org.apache.kafka.clients.NetworkClient) > {code} > Attached the controller logs and logs from broker-0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-14698: - Attachment: test_online_migration.tar.gz > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.5.0, 3.4.1 > > Attachments: broker0.log, controller.log, test_online_migration.tar.gz > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] > Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client > requested connection close from node 0 > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 > disconnected. (org.apache.kafka.clients.NetworkClient) > {code} > Attached the controller logs and logs from broker-0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.
[ https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-9981. -- Fix Version/s: 3.5.0 Resolution: Fixed > Running a dedicated mm2 cluster with more than one nodes,When the > configuration is updated the task is not aware and will lose the update > operation. > > > Key: KAFKA-9981 > URL: https://issues.apache.org/jira/browse/KAFKA-9981 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: victor >Assignee: Chris Egerton >Priority: Major > Fix For: 3.5.0 > > > DistributedHerder.reconfigureConnector induction config update as follows: > {code:java} > if (changed) { > List> rawTaskProps = reverseTransform(connName, > configState, taskProps); > if (isLeader()) { > configBackingStore.putTaskConfigs(connName, rawTaskProps); > cb.onCompletion(null, null); > } else { > // We cannot forward the request on the same thread because this > reconfiguration can happen as a result of connector > // addition or removal. If we blocked waiting for the response from > leader, we may be kicked out of the worker group. > forwardRequestExecutor.submit(new Runnable() { > @Override > public void run() { > try { > String leaderUrl = leaderUrl(); > if (leaderUrl == null || leaderUrl.trim().isEmpty()) { > cb.onCompletion(new ConnectException("Request to > leader to " + > "reconfigure connector tasks failed " + > "because the URL of the leader's REST > interface is empty!"), null); > return; > } > String reconfigUrl = RestServer.urlJoin(leaderUrl, > "/connectors/" + connName + "/tasks"); > log.trace("Forwarding task configurations for connector > {} to leader", connName); > RestClient.httpRequest(reconfigUrl, "POST", null, > rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); > cb.onCompletion(null, null); > } catch (ConnectException e) { > log.error("Request to leader to reconfigure connector > tasks failed", e); > cb.onCompletion(e, null); > } > } > }); > } > } > {code} > KafkaConfigBackingStore task checks for configuration updates,such as topic > whitelist update.If KafkaConfigBackingStore task is not running on leader > node,an HTTP request will be send to notify the leader of the configuration > update.However,dedicated mm2 cluster does not have the HTTP server turned > on,so the request will fail to be sent,causing the update operation to be > lost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-14698: - Fix Version/s: 3.5.0 3.4.1 > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.5.0, 3.4.1 > > Attachments: broker0.log, controller.log, test_online_migration.tar.gz > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] > Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client > requested connection close from node 0 > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 > disconnected. (org.apache.kafka.clients.NetworkClient) > {code} > Attached the controller logs and logs from broker-0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13137: KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2
C0urante merged PR #13137: URL: https://github.com/apache/kafka/pull/13137 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686614#comment-17686614 ] Mickael Maison commented on KAFKA-14698: Ignoring the errors and continuing the migration steps seems to work. Once I restarted the controllers without migration enabled, the errors stopped. > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Priority: Major > Attachments: broker0.log, controller.log > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] > Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client > requested connection close from node 0 > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 > disconnected. (org.apache.kafka.clients.NetworkClient) > {code} > Attached the controller logs and logs from broker-0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14697) KRaft controller should not send LISR to KRaft brokers during migration
[ https://issues.apache.org/jira/browse/KAFKA-14697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686613#comment-17686613 ] Mickael Maison commented on KAFKA-14697: Ignoring the errors and continuing the migration steps seems to work. Once I restarted the controllers without migration enabled, the errors stopped. > KRaft controller should not send LISR to KRaft brokers during migration > --- > > Key: KAFKA-14697 > URL: https://issues.apache.org/jira/browse/KAFKA-14697 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: David Arthur >Priority: Major > Attachments: test_online_migration.tar.gz > > > During a migration, after a broker is restarted in KRaft mode the KRaft > controller is still sending it LISR as if it were a ZK broker. This results > in errors like: > {code} > [2023-02-09 14:53:26,892] ERROR Closing socket for > 172.19.0.4:9092-172.19.0.9:32876-0 because of error (kafka.network.Processor) > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > {code} > This can be observed by running the > kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration > ducktape test. Attached are logs from a local run of this test with --debug > set. > It seems these errors do not affect the consistency of metadata during the > migration, but the excessive LISR sent out will impact performance on large > clusters. -- This message was sent by Atlassian Jira (v8.20.10#820010)