[jira] [Created] (KAFKA-14704) Follower should truncate before incrementing high watermark

2023-02-09 Thread David Jacot (Jira)
David Jacot created KAFKA-14704:
---

 Summary: Follower should truncate before incrementing high 
watermark
 Key: KAFKA-14704
 URL: https://issues.apache.org/jira/browse/KAFKA-14704
 Project: Kafka
  Issue Type: Bug
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lookingformira opened a new pull request, #13229: 3.2

2023-02-09 Thread via GitHub


lookingformira opened a new pull request, #13229:
URL: https://github.com/apache/kafka/pull/13229

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14513) Add broker side PartitionAssignor interface

2023-02-09 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14513.
-
Resolution: Fixed

> Add broker side PartitionAssignor interface
> ---
>
> Key: KAFKA-14513
> URL: https://issues.apache.org/jira/browse/KAFKA-14513
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


dajac merged PR #13202:
URL: https://github.com/apache/kafka/pull/13202


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] calmera commented on a diff in pull request #12742: KAFKA-10892: Shared Readonly State Stores ( revisited )

2023-02-09 Thread via GitHub


calmera commented on code in PR #12742:
URL: https://github.com/apache/kafka/pull/12742#discussion_r1102355418


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -54,7 +54,7 @@
  * The consumer configuration keys
  */
 public class ConsumerConfig extends AbstractConfig {
-private static final ConfigDef CONFIG;
+protected static final ConfigDef CONFIG;

Review Comment:
   Hmm looks like my reverse consumer work got committed by accident. will 
clean it and go through your comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1102262205


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -383,6 +389,114 @@ public void shouldDistinguishEmptyAndNull() {
 verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
 }
 
+@Test
+public void shouldRestore() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k", "vp20", SEGMENT_INTERVAL + 20));
+records.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10));
+records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10));
+records.add(new DataRecord("k", "vn2", SEGMENT_INTERVAL - 2));
+records.add(new DataRecord("k", "vn1", SEGMENT_INTERVAL - 1));
+records.add(new DataRecord("k", "vp1", SEGMENT_INTERVAL + 1));
+
+store.restoreBatch(getChangelogRecords(records));
+
+verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", 
SEGMENT_INTERVAL + 20);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", 
SEGMENT_INTERVAL + 10);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", 
SEGMENT_INTERVAL + 1);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", 
SEGMENT_INTERVAL - 1);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", 
SEGMENT_INTERVAL - 1);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", 
SEGMENT_INTERVAL - 2);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", 
SEGMENT_INTERVAL - 10);
+}
+
+@Test
+public void shouldRestoreWithNulls() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10));
+records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5));
+records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5));
+records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6));
+
+store.restoreBatch(getChangelogRecords(records));
+
+verifyGetNullFromStore("k");
+verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
+verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", 
SEGMENT_INTERVAL + 5);
+verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
+verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
+verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", 
SEGMENT_INTERVAL - 5);
+verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", 
SEGMENT_INTERVAL - 6);
+verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
+}
+
+@Test
+public void shouldRestoreWithNullsAndRepeatTimestamps() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 
20));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 
10));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 
1));
+records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 
1));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 5));
+records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5));
+records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 
5));
+records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6));

Review Comment:
   Similar to N/5 -- hard to keep track without writing it down. Can we add 
some comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please

[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1102260418


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
 // VisibleForTesting
 void restoreBatch(final Collection> 
records) {
-throw new UnsupportedOperationException("not yet implemented");
+// advance stream time to the max timestamp in the batch
+for (final ConsumerRecord record : records) {
+observedStreamTime = Math.max(observedStreamTime, 
record.timestamp());
+}
+
+final VersionedStoreClient restoreClient = 
restoreWriteBuffer.getClient();

Review Comment:
   I see -- fair enough (in general, I am a fan on strong typing whenever 
possible, but it's ok I guess).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1102259793


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * 
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+// write buffer for latest value store. value type is Optional in order to 
track tombstones
+// which must be written to the underlying store.
+private final Map> latestValueWriteBuffer;
+// map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+// so getReverseSegments() is more efficient
+private final TreeMap 
segmentsWriteBuffer;
+private final BatchWritingVersionedStoreClient 
dbClient;
+private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+/**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying 
persistent store
+ */
+RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient dbClient) {
+this.dbClient = Objects.requireNonNull(dbClient);
+
+this.latestValueWriteBuffer = new HashMap<>();
+// store in reverse-sorted order, to make getReverseSegments() more 
efficient
+this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+}
+
+/**
+ * @return client for writing to (and reading from) the write buffer
+ */
+VersionedStoreClient getClient() {
+return restoreClient;
+}
+
+/**
+ * Flushes the contents of the write buffer into the persistent store, and 
clears the write
+ * buffer in the process.
+ * @throws RocksDBException if a failure occurs adding to or writing a 
{@link WriteBatch}
+ */
+void flush() throws RocksDBException {
+
+// flush segments first, as this is consistent with the store always 
writing to
+// older segments/stores before later ones
+try (final WriteBatch segmentsBatch = new WriteBatch()) {
+final List allSegments = 
restoreClient.getReverseSegments(Long.MIN_VALUE);
+if (allSegments.size() > 0) {
+// collect entries into write batch
+for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
+final LogicalKeyValueSegment dbSegment = 
bufferSegment.dbSegment();
+for (final Map.Entry segmentEntry : 
bufferSegment.getAll()) {
+dbSegment.addTo

[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1102257544


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private static

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102256035


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBVersionedStoreTest {
+
+private static final String STORE_NAME = "myversionedrocks";
+private static final String METRICS_SCOPE = "versionedrocksdb";
+private static final long HISTORY_RETENTION = 300_000L;
+private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3;
+private static final long BASE_TIMESTAMP = 10L;
+private static final Serializer STRING_SERIALIZER = new 
StringSerializer();
+private static final Deserializer STRING_DESERIALIZER = new 
StringDeserializer();
+
+private InternalMockProcessorContext context;
+
+private RocksDBVersionedStore store;
+
+@Before
+public void before() {
+context = new InternalMockProcessorContext<>(
+TestUtils.tempDirectory(),
+Serdes.String(),
+Serdes.String(),
+new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+);
+context.setTime(BASE_TIMESTAMP);
+
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 
HISTORY_RETENTION, SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+}
+
+@After
+public void after() {
+store.close();
+}
+
+@Test
+public void shouldPutLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP + 1);
+
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", 
BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 1);
+}
+
+@Test
+public void shouldPutNullAsLatest() {
+putToStore("k", null, BASE_TIMESTAMP);
+putToStore("k", null, BASE_TIMESTAMP + 1);
+
+verifyGetNullFromStore("k");
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2);
+}
+
+@Test
+public void shouldPutOlderWithNonNullLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP - 2);
+putToStore("k", "v1", BASE_TIMESTAMP - 1);
+putToStore("k", "v4", BASE_TIMESTAMP - 4);
+
+verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", 
BASE_TIMESTAMP - 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", 
BASE_TIMESTAMP - 2);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", 
BASE_TIMESTAMP - 4);
+}
+
+@Test
+public void shouldPutOlderWithNullLatest() {
+putToStore("k", null, BASE_TIM

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102252265


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBVersionedStoreTest {
+
+private static final String STORE_NAME = "myversionedrocks";
+private static final String METRICS_SCOPE = "versionedrocksdb";
+private static final long HISTORY_RETENTION = 300_000L;
+private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3;
+private static final long BASE_TIMESTAMP = 10L;
+private static final Serializer STRING_SERIALIZER = new 
StringSerializer();
+private static final Deserializer STRING_DESERIALIZER = new 
StringDeserializer();
+
+private InternalMockProcessorContext context;
+
+private RocksDBVersionedStore store;
+
+@Before
+public void before() {
+context = new InternalMockProcessorContext<>(
+TestUtils.tempDirectory(),
+Serdes.String(),
+Serdes.String(),
+new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+);
+context.setTime(BASE_TIMESTAMP);
+
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 
HISTORY_RETENTION, SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+}
+
+@After
+public void after() {
+store.close();
+}
+
+@Test
+public void shouldPutLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP + 1);
+
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", 
BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 1);
+}
+
+@Test
+public void shouldPutNullAsLatest() {
+putToStore("k", null, BASE_TIMESTAMP);
+putToStore("k", null, BASE_TIMESTAMP + 1);
+
+verifyGetNullFromStore("k");
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2);
+}
+
+@Test
+public void shouldPutOlderWithNonNullLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP - 2);
+putToStore("k", "v1", BASE_TIMESTAMP - 1);
+putToStore("k", "v4", BASE_TIMESTAMP - 4);
+
+verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", 
BASE_TIMESTAMP - 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", 
BASE_TIMESTAMP - 2);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", 
BASE_TIMESTAMP - 4);
+}
+
+@Test
+public void shouldPutOlderWithNullLatest() {
+putToStore("k", null, BASE_TIM

[GitHub] [kafka] ouyangnengda commented on pull request #13220: Kafka 14253

2023-02-09 Thread via GitHub


ouyangnengda commented on PR #13220:
URL: https://github.com/apache/kafka/pull/13220#issuecomment-1425139081

   Hi @divijvaidya , could you please review this PR.
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102248187


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBVersionedStoreTest {
+
+private static final String STORE_NAME = "myversionedrocks";
+private static final String METRICS_SCOPE = "versionedrocksdb";
+private static final long HISTORY_RETENTION = 300_000L;
+private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3;
+private static final long BASE_TIMESTAMP = 10L;
+private static final Serializer STRING_SERIALIZER = new 
StringSerializer();
+private static final Deserializer STRING_DESERIALIZER = new 
StringDeserializer();
+
+private InternalMockProcessorContext context;
+
+private RocksDBVersionedStore store;
+
+@Before
+public void before() {
+context = new InternalMockProcessorContext<>(
+TestUtils.tempDirectory(),
+Serdes.String(),
+Serdes.String(),
+new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+);
+context.setTime(BASE_TIMESTAMP);
+
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 
HISTORY_RETENTION, SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+}
+
+@After
+public void after() {
+store.close();
+}
+
+@Test
+public void shouldPutLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP + 1);
+
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", 
BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 1);
+}
+
+@Test
+public void shouldPutNullAsLatest() {
+putToStore("k", null, BASE_TIMESTAMP);
+putToStore("k", null, BASE_TIMESTAMP + 1);
+
+verifyGetNullFromStore("k");
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2);
+}
+
+@Test
+public void shouldPutOlderWithNonNullLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP - 2);
+putToStore("k", "v1", BASE_TIMESTAMP - 1);
+putToStore("k", "v4", BASE_TIMESTAMP - 4);
+
+verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", 
BASE_TIMESTAMP - 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", 
BASE_TIMESTAMP - 2);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", 
BASE_TIMESTAMP - 4);
+}
+
+@Test
+public void shouldPutOlderWithNullLatest() {
+putToStore("k", null, BASE_TIM

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102246615


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBVersionedStoreTest {
+
+private static final String STORE_NAME = "myversionedrocks";
+private static final String METRICS_SCOPE = "versionedrocksdb";
+private static final long HISTORY_RETENTION = 300_000L;
+private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3;
+private static final long BASE_TIMESTAMP = 10L;
+private static final Serializer STRING_SERIALIZER = new 
StringSerializer();
+private static final Deserializer STRING_DESERIALIZER = new 
StringDeserializer();
+
+private InternalMockProcessorContext context;
+
+private RocksDBVersionedStore store;
+
+@Before
+public void before() {
+context = new InternalMockProcessorContext<>(
+TestUtils.tempDirectory(),
+Serdes.String(),
+Serdes.String(),
+new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+);
+context.setTime(BASE_TIMESTAMP);
+
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 
HISTORY_RETENTION, SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+}
+
+@After
+public void after() {
+store.close();
+}
+
+@Test
+public void shouldPutLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP + 1);
+
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", 
BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 1);
+}
+
+@Test
+public void shouldPutNullAsLatest() {
+putToStore("k", null, BASE_TIMESTAMP);
+putToStore("k", null, BASE_TIMESTAMP + 1);
+
+verifyGetNullFromStore("k");
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2);
+}
+
+@Test
+public void shouldPutOlderWithNonNullLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP - 2);
+putToStore("k", "v1", BASE_TIMESTAMP - 1);
+putToStore("k", "v4", BASE_TIMESTAMP - 4);
+
+verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", 
BASE_TIMESTAMP - 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", 
BASE_TIMESTAMP - 2);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", 
BASE_TIMESTAMP - 4);
+}
+
+@Test
+public void shouldPutOlderWithNullLatest() {
+putToStore("k", null, BASE_TIM

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102246409


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBVersionedStoreTest {
+
+private static final String STORE_NAME = "myversionedrocks";
+private static final String METRICS_SCOPE = "versionedrocksdb";
+private static final long HISTORY_RETENTION = 300_000L;
+private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3;
+private static final long BASE_TIMESTAMP = 10L;
+private static final Serializer STRING_SERIALIZER = new 
StringSerializer();
+private static final Deserializer STRING_DESERIALIZER = new 
StringDeserializer();
+
+private InternalMockProcessorContext context;
+
+private RocksDBVersionedStore store;
+
+@Before
+public void before() {
+context = new InternalMockProcessorContext<>(
+TestUtils.tempDirectory(),
+Serdes.String(),
+Serdes.String(),
+new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+);
+context.setTime(BASE_TIMESTAMP);
+
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 
HISTORY_RETENTION, SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+}
+
+@After
+public void after() {
+store.close();
+}
+
+@Test
+public void shouldPutLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP + 1);
+
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", 
BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 1);
+}
+
+@Test
+public void shouldPutNullAsLatest() {
+putToStore("k", null, BASE_TIMESTAMP);
+putToStore("k", null, BASE_TIMESTAMP + 1);
+
+verifyGetNullFromStore("k");
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 2);
+}
+
+@Test
+public void shouldPutOlderWithNonNullLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP - 2);
+putToStore("k", "v1", BASE_TIMESTAMP - 1);
+putToStore("k", "v4", BASE_TIMESTAMP - 4);
+
+verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", 
BASE_TIMESTAMP - 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", 
BASE_TIMESTAMP - 2);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", 
BASE_TIMESTAMP - 4);
+}
+
+@Test
+public void shouldPutOlderWithNullLatest() {
+putToStore("k", null, BASE_TIM

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102245805


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBVersionedStoreTest {

Review Comment:
   Can we add a test (maybe in a follow up PR, with retention time zero)? 
Having no segments and `segment_interval = 0` (?) seems to be a corner case we 
should also support.
   
   Btw: it seems we don't test if `delete()` does return the right "old value"; 
can we also add some tests for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102243061


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBVersionedStoreTest {
+
+private static final String STORE_NAME = "myversionedrocks";
+private static final String METRICS_SCOPE = "versionedrocksdb";
+private static final long HISTORY_RETENTION = 300_000L;
+private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3;
+private static final long BASE_TIMESTAMP = 10L;
+private static final Serializer STRING_SERIALIZER = new 
StringSerializer();
+private static final Deserializer STRING_DESERIALIZER = new 
StringDeserializer();
+
+private InternalMockProcessorContext context;
+
+private RocksDBVersionedStore store;
+
+@Before
+public void before() {
+context = new InternalMockProcessorContext<>(
+TestUtils.tempDirectory(),
+Serdes.String(),
+Serdes.String(),
+new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+);
+context.setTime(BASE_TIMESTAMP);
+
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 
HISTORY_RETENTION, SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+}
+
+@After
+public void after() {
+store.close();
+}
+
+@Test
+public void shouldPutLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP + 1);
+
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", 
BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 1);
+}
+
+@Test
+public void shouldPutNullAsLatest() {
+putToStore("k", null, BASE_TIMESTAMP);

Review Comment:
   Should we not first do `putToStore("k", "v", BASE_TIMESTAMP);` to ensure 
that the `put(null)` really deletes `v` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102242415


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBVersionedStoreTest {
+
+private static final String STORE_NAME = "myversionedrocks";
+private static final String METRICS_SCOPE = "versionedrocksdb";
+private static final long HISTORY_RETENTION = 300_000L;
+private static final long SEGMENT_INTERVAL = HISTORY_RETENTION / 3;
+private static final long BASE_TIMESTAMP = 10L;
+private static final Serializer STRING_SERIALIZER = new 
StringSerializer();
+private static final Deserializer STRING_DESERIALIZER = new 
StringDeserializer();
+
+private InternalMockProcessorContext context;
+
+private RocksDBVersionedStore store;
+
+@Before
+public void before() {
+context = new InternalMockProcessorContext<>(
+TestUtils.tempDirectory(),
+Serdes.String(),
+Serdes.String(),
+new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+);
+context.setTime(BASE_TIMESTAMP);
+
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 
HISTORY_RETENTION, SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+}
+
+@After
+public void after() {
+store.close();
+}
+
+@Test
+public void shouldPutLatest() {
+putToStore("k", "v", BASE_TIMESTAMP);
+putToStore("k", "v2", BASE_TIMESTAMP + 1);
+
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", 
BASE_TIMESTAMP + 1);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 1);

Review Comment:
   That's an interesting one... User can not also query using "future 
timestamp" -- nothing wrong with it; just interesting "side effect" :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102238142


##
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A key-value store that stores multiple record versions per key, and 
supports timestamp-based
+ * retrieval operations to return the latest record (per key) as of a 
specified timestamp.
+ * Only one record is stored per key and timestamp, i.e., a second call to
+ * {@link #put(Object, Object, long)} with the same key and timestamp will 
replace the first.
+ * 
+ * Each store instance has an associated, fixed-duration "history retention" 
which specifies
+ * how long old record versions should be kept for. In particular, a versioned 
store guarantees
+ * to return accurate results for calls to {@link #get(Object, long)} where 
the provided timestamp
+ * bound is within history retention of the current observed stream time. 
(Queries with timestamp
+ * bound older than the specified history retention are considered invalid.)
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public interface VersionedKeyValueStore extends StateStore {
+
+/**
+ * Add a new record version associated with this key.
+ *
+ * @param key   The key
+ * @param value The value, it can be {@code null};
+ *  if the serialized bytes are also {@code null} it is 
interpreted as a delete
+ * @param timestamp The timestamp for this record version
+ * @throws NullPointerException If {@code null} is used for key.
+ */
+void put(K key, V value, long timestamp);
+
+/**
+ * Delete the value associated with this key from the store, at the 
specified timestamp
+ * (if there is such a value), and return the deleted value.
+ * 
+ * This operation is semantically equivalent to {@link #get(Object, long)} 
#get(key, timestamp))}
+ * followed by {@link #put(Object, Object, long) #put(key, null, 
timestamp)}.
+ *
+ * @param key   The key
+ * @param timestamp The timestamp for this delete
+ * @return The value and timestamp of the latest record associated with 
this key
+ * as of the deletion timestamp (inclusive), or {@code null} if 
any of
+ * (1) the store contains no records for this key, (2) the latest 
record
+ * for this key as of the deletion timestamp is a tombstone, or
+ * (3) the deletion timestamp is older than this store's history 
retention
+ * (i.e., this store no longer contains data for the provided 
timestamp).
+ * @throws NullPointerException If {@code null} is used for key.
+ */
+VersionedRecord delete(K key, long timestamp);
+
+/**
+ * Get the latest (by timestamp) record associated with this key.
+ *
+ * @param key The key to fetch
+ * @return The value and timestamp of the latest record associated with 
this key, or
+ * {@code null} if either (1) the store contains no records for 
this key or (2) the
+ * latest record for this key is a tombstone.
+ * @throws NullPointerException   If null is used for key.
+ * @throws InvalidStateStoreException if the store is not initialized
+ */
+VersionedRecord get(K key);
+
+/**
+ * Get the latest record associated with this key with timestamp not 
exceeding the specified
+ * timestamp bound.

Review Comment:
   Fair point.
   
   > particularly because during KIP discussion one person mentioned that these 
terms did not feel intuitive to them
   
   I cannot remember this; personally I always found it very intuitive (also 
because it's use in temporal tables in SQL), but of course this could just be 
me.
   
   I agree that we should not use it only here, but rather introduce it 
"globally" or not use it at all. It's "just" JavaDocs and just easy to change 
in follow up releases if thy cause confusion, so I don't feel strong about 

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1102235180


##
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A key-value store that stores multiple record versions per key, and 
supports timestamp-based
+ * retrieval operations to return the latest record (per key) as of a 
specified timestamp.
+ * Only one record is stored per key and timestamp, i.e., a second call to
+ * {@link #put(Object, Object, long)} with the same key and timestamp will 
replace the first.
+ * 
+ * Each store instance has an associated, fixed-duration "history retention" 
which specifies
+ * how long old record versions should be kept for. In particular, a versioned 
store guarantees
+ * to return accurate results for calls to {@link #get(Object, long)} where 
the provided timestamp
+ * bound is within history retention of the current observed stream time. 
(Queries with timestamp
+ * bound older than the specified history retention are considered invalid.)
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public interface VersionedKeyValueStore extends StateStore {
+
+/**
+ * Add a new record version associated with this key.
+ *
+ * @param key   The key
+ * @param value The value, it can be {@code null};
+ *  if the serialized bytes are also {@code null} it is 
interpreted as a delete

Review Comment:
   This PR was controversial... And I am still not a fan of it -> 
https://github.com/apache/kafka/pull/7679#issuecomment-581695959
   
   It potentially breaks stuff -- of course, if users follow the `null<->null` 
and `non-null<->non-null` pattern the PR does not do any harm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13228: [DRAFT] KAFKA-10199: Add task updater metrics, part 1

2023-02-09 Thread via GitHub


guozhangwang commented on PR #13228:
URL: https://github.com/apache/kafka/pull/13228#issuecomment-1425085767

   cc @lucasbru , would update after #13025 is done, and the pausing logic is 
extracted out of the `checkAllUpdatingTaskStates` function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #13228: [DRAFT] KAFKA-10199: Add task updater metrics, part 1

2023-02-09 Thread via GitHub


guozhangwang opened a new pull request, #13228:
URL: https://github.com/apache/kafka/pull/13228

   Should only be reviewed after #13025
   
   1. Added thread-level restoration metrics.
   2. Related unit tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #13227: KAFKA-14693; Kafka node should halt instead of exit

2023-02-09 Thread via GitHub


jsancio opened a new pull request, #13227:
URL: https://github.com/apache/kafka/pull/13227

   Extend the implementation of ProcessTerminatingFaultHandler to support 
calling either Exit.halt or Exit.exit. Change the fault handler used by the 
Controller thread and the KRaft thread to use a halting fault handler.
   
   Those two threads cannot call Exit.exit because Runtime.exit joins on the 
default shutdown hook thread. The shutdown hook thread joins on the controller 
and kraft thread terminating. This causes a deadlock.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-09 Thread via GitHub


gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1425058794

   > Out of an abundance of caution, what do you think about targeting your 
mm2-negative-offsets branch with a new PR to address 
[KAFKA-13659](https://issues.apache.org/jira/browse/KAFKA-13659), which can be 
reviewed separately but then merged to trunk in tandem with this PR?
   
   I tried to do this, but it appears that the "stacked PRs" feature for github 
only applies for branches on the same repository, not across forks. If i were 
to open a stacked PR, it would be on my fork, and review comments may be lost 
if I accidentally delete the repository.
   
   In the interest of an atomic merge, and preserving comment history, I've 
added the commits here. To make it easier on your review, I will continue to 
tag the relevant commits with KAFKA-13659, so you can filter during your 
review. I hope this is acceptable, but sorry in advance for the difficult 
review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14565) Interceptor Resource Leak

2023-02-09 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

External issue URL:   (was: https://github.com/apache/kafka/pull/13168)

> Interceptor Resource Leak
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> *AbstractConfig.getConfiguredInstances()*  is delegated responsibility for 
> both creating and configuring each interceptor listed in the 
> interceptor.classes property and returns a configured  
> *List>* interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> two approaches:
> +*PROPOSAL 1*+
> Define a default *open* or *configureWithResources()* or *acquireResources()* 
>  method with no implementation and check exception on the respective 
> Consumer/Producer interceptor interfaces.  This method as a part the 
> interceptor life cycle management will be responsible for creating threads 
> and/or objects which utilizes threads, connections or other resource which 
> requires clean up.  Additionally, this default method enables implementation 
> optionality as it's empty default behavior means it will do nothing when 
> unimplemented mitigating backwards compatibility impact to exiting 
> interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers 
> will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* 
> or *maybeAcquireResources* method which also throws a checked exception. 
> See below code excerpt for the Consumer/Producer constructor:
> {code:java}
> List> interceptorList = (List) 
> config.getConfiguredInstances(
> ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
> ConsumerInterceptor.class,
> Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> this.interceptors = new ConsumerInterceptors<>(interceptorList);
> this.interceptors.maybeConfigureWithResources();
>  {code}
> +*PROPOSAL 2*+
> To avoid changing any public interfaces and the subsequent KIP process, we can
>  * Create a class which inherits or wraps AbstractConfig that contains a new 
> method which will return a ConfiguredInstanceResult class.  This 
> ConfiguredInstanceResult  class will contain an optional list of successfully 
> created interceptors and/or exception which occurred while calling each 
> Interceptor::configure.  Additionally, it will contain a helper method to 
> rethrow an exception as well as a method which returns the underlying 
> exception.  The caller is expected to handle the exception and perform clean 
> up e.g. call  Interceptor::close  on each interceptor in the list provided by 
> the ConfiguredInstanceResult class.
>  * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} 
> instances if/when a failure occurs
>  * Add a new overloaded {{getConfiguredInstance}} / 
> {{getConfiguredInstances}} variant that allows users to specify whether 
> already-instantiated classes should be closed or not when a failure occurs
>  * Add a new exception type to the public API that includes a list of all of 
> the successfully-instantiated (and/or successfully-configured) instances 
> before the error was encountered so that callers can choose how to handle the 
> failure however they want (and possibly so that instantiation/configuration 
> can be attempted on every class before throwing the exception)
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14565) Interceptor Resource Leak

2023-02-09 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

External issue URL: https://github.com/apache/kafka/pull/13168

> Interceptor Resource Leak
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> *AbstractConfig.getConfiguredInstances()*  is delegated responsibility for 
> both creating and configuring each interceptor listed in the 
> interceptor.classes property and returns a configured  
> *List>* interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> two approaches:
> +*PROPOSAL 1*+
> Define a default *open* or *configureWithResources()* or *acquireResources()* 
>  method with no implementation and check exception on the respective 
> Consumer/Producer interceptor interfaces.  This method as a part the 
> interceptor life cycle management will be responsible for creating threads 
> and/or objects which utilizes threads, connections or other resource which 
> requires clean up.  Additionally, this default method enables implementation 
> optionality as it's empty default behavior means it will do nothing when 
> unimplemented mitigating backwards compatibility impact to exiting 
> interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers 
> will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* 
> or *maybeAcquireResources* method which also throws a checked exception. 
> See below code excerpt for the Consumer/Producer constructor:
> {code:java}
> List> interceptorList = (List) 
> config.getConfiguredInstances(
> ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
> ConsumerInterceptor.class,
> Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> this.interceptors = new ConsumerInterceptors<>(interceptorList);
> this.interceptors.maybeConfigureWithResources();
>  {code}
> +*PROPOSAL 2*+
> To avoid changing any public interfaces and the subsequent KIP process, we can
>  * Create a class which inherits or wraps AbstractConfig that contains a new 
> method which will return a ConfiguredInstanceResult class.  This 
> ConfiguredInstanceResult  class will contain an optional list of successfully 
> created interceptors and/or exception which occurred while calling each 
> Interceptor::configure.  Additionally, it will contain a helper method to 
> rethrow an exception as well as a method which returns the underlying 
> exception.  The caller is expected to handle the exception and perform clean 
> up e.g. call  Interceptor::close  on each interceptor in the list provided by 
> the ConfiguredInstanceResult class.
>  * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} 
> instances if/when a failure occurs
>  * Add a new overloaded {{getConfiguredInstance}} / 
> {{getConfiguredInstances}} variant that allows users to specify whether 
> already-instantiated classes should be closed or not when a failure occurs
>  * Add a new exception type to the public API that includes a list of all of 
> the successfully-instantiated (and/or successfully-configured) instances 
> before the error was encountered so that callers can choose how to handle the 
> failure however they want (and possibly so that instantiation/configuration 
> can be attempted on every class before throwing the exception)
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14703) Don't resign when failing to replay uncommitted records

2023-02-09 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14703:
---
Description: 
h1. Problem

The KRaft controller is replays both committed and uncommitted records. 
Committed records are replayed by the inactive controller. Uncommitted records 
are replayed by the active controller.

When handling an RPC the active controller generates a response and a list of 
uncommitted records. The active controller replays the uncommitted records 
before sending them to the KRaft layer for durability and replication. If the 
active controller encounters an error when replaying the uncommitted records, 
it calls the process exit fault handler.

Indirectly, the process exit fault handler resigns its KRaft leadership and 
closes all of the client connections.

Most clients to retry the RPC when they disconnect from the remote endpoint. If 
the RPC's replay error is deterministic then it is possible for the failure to 
propagate to all of the controllers as they become leaders. This handling may 
cause the controllers to become unavailable.
h1. Solution

We can avoid this failure from propagating to all of the controllers by 
changing how we handle errors when replaying uncommitted records. The active 
controller doesn't need to fatally exit, if it failed to replay an uncommitted 
record. The active controller should instead failed the RPC with an 
UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot before 
the RPC was handled. 
h1. Drawback

This solution doesn't work if the error is in the Timeline data structures 
themselves and the controller is unable to SnapshotRegistry::revertToSnapshot 
to the previous state.

  was:
h1. Problem

The KRaft controller is replays both committed and uncommitted records. 
Committed records are replayed by the inactive controller. Uncommitted records 
are replayed by the active controller.

When handling an RPC the active controller generates a response and a list of 
uncommitted records. The active controller replays the uncommitted records 
before sending them to the KRaft layer for durability and replication. If the 
active controller encounters an error when replaying the uncommitted records, 
it calls the process exit fault handler.

Indirectly, the process exit fault handler resigns its KRaft leadership and 
closes all of the client connections.

Most clients to retry the RPC when they disconnect from the remote endpoint. If 
the RPC's replay error is deterministic then it is possible for the failure to 
propagate to all of the controllers as they become leaders. This handling may 
cause the controllers to become unavailable.
h1. Solution

We can avoid this failure from propagating to all of the controllers by 
changing how we handle errors when replaying uncommitted records. The active 
controller doesn't need to fatally exit, if it failed to replay an uncommitted 
record. The active controller should instead failed the RPC with an 
UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot before 
the RPC was handled.


> Don't resign when failing to replay uncommitted records
> ---
>
> Key: KAFKA-14703
> URL: https://issues.apache.org/jira/browse/KAFKA-14703
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: José Armando García Sancio
>Priority: Major
>
> h1. Problem
> The KRaft controller is replays both committed and uncommitted records. 
> Committed records are replayed by the inactive controller. Uncommitted 
> records are replayed by the active controller.
> When handling an RPC the active controller generates a response and a list of 
> uncommitted records. The active controller replays the uncommitted records 
> before sending them to the KRaft layer for durability and replication. If the 
> active controller encounters an error when replaying the uncommitted records, 
> it calls the process exit fault handler.
> Indirectly, the process exit fault handler resigns its KRaft leadership and 
> closes all of the client connections.
> Most clients to retry the RPC when they disconnect from the remote endpoint. 
> If the RPC's replay error is deterministic then it is possible for the 
> failure to propagate to all of the controllers as they become leaders. This 
> handling may cause the controllers to become unavailable.
> h1. Solution
> We can avoid this failure from propagating to all of the controllers by 
> changing how we handle errors when replaying uncommitted records. The active 
> controller doesn't need to fatally exit, if it failed to replay an 
> uncommitted record. The active controller should instead failed the RPC with 
> an UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot 
> before t

[GitHub] [kafka] elarib closed pull request #5405: KAFKA-7189 : Add a Merge Transformer for Kafka Connect

2023-02-09 Thread via GitHub


elarib closed pull request #5405: KAFKA-7189 : Add a Merge Transformer for 
Kafka Connect
URL: https://github.com/apache/kafka/pull/5405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-02-09 Thread via GitHub


jolshan commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1424973150

   One concern is that the ConcurrentHashMap could take up more space. This 
metric was introduced to try to get this memory usage under control. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14703) Don't resign when failing to replay uncommitted records

2023-02-09 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686798#comment-17686798
 ] 

Colin McCabe commented on KAFKA-14703:
--

Sorry, but I don’t think this will work. We don’t know what state the in-memory 
data structures are in. Exiting is the only safe course of action.

It is unfortunate that we need to exit, but every bug is unfortunate. 
Controllers restart quickly so it should not be a big problem in practice.

> Don't resign when failing to replay uncommitted records
> ---
>
> Key: KAFKA-14703
> URL: https://issues.apache.org/jira/browse/KAFKA-14703
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: José Armando García Sancio
>Priority: Major
>
> h1. Problem
> The KRaft controller is replays both committed and uncommitted records. 
> Committed records are replayed by the inactive controller. Uncommitted 
> records are replayed by the active controller.
> When handling an RPC the active controller generates a response and a list of 
> uncommitted records. The active controller replays the uncommitted records 
> before sending them to the KRaft layer for durability and replication. If the 
> active controller encounters an error when replaying the uncommitted records, 
> it calls the process exit fault handler.
> Indirectly, the process exit fault handler resigns its KRaft leadership and 
> closes all of the client connections.
> Most clients to retry the RPC when they disconnect from the remote endpoint. 
> If the RPC's replay error is deterministic then it is possible for the 
> failure to propagate to all of the controllers as they become leaders. This 
> handling may cause the controllers to become unavailable.
> h1. Solution
> We can avoid this failure from propagating to all of the controllers by 
> changing how we handle errors when replaying uncommitted records. The active 
> controller doesn't need to fatally exit, if it failed to replay an 
> uncommitted record. The active controller should instead failed the RPC with 
> an UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot 
> before the RPC was handled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14546) Allow Partitioner to return -1 to indicate default partitioning

2023-02-09 Thread James Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686780#comment-17686780
 ] 

James Olsen commented on KAFKA-14546:
-

[~viktorsomogyi] Is this on anyones' radar?  This will become a show-stopper if 
DefaultPartitioner is eventually removed.

> Allow Partitioner to return -1 to indicate default partitioning
> ---
>
> Key: KAFKA-14546
> URL: https://issues.apache.org/jira/browse/KAFKA-14546
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 3.3.1
>Reporter: James Olsen
>Priority: Major
>
> Prior to KIP-794 it was possible to create a custom Partitioner that could 
> delegate to the DefaultPartitioner.  DefaultPartitioner has been deprecated 
> so we can now only delegate to BuiltInPartitioner.partitionForKey which does 
> not handle a non-keyed message.  Hence there is now no mechanism for a custom 
> Partitioner to fallback to default partitioning, e.g. for the non-keyed 
> sticky case.
> I would like to propose that KafkaProducer.partition(...) not throw 
> IllegalArgumentException if the Partitioner returns 
> RecordMetadata.UNKNOWN_PARTITION and instead continue with the default 
> behaviour.  Maybe with a configuration flag to enable this behaviour so as 
> not to break existing expectations?
> Why was Partitioner delegation with default fallback useful?
>  # A single Producer can be used to write to multiple Topics where each Topic 
> may have different partitioning requirements.  The Producer can only have a 
> single Partitioner so the Partitioner needs to be able to switch behaviour 
> based on the Topic, including the need to fallback to default behaviour if a 
> given Topic does not have a custom requirement.
>  # Multiple services may need to produce to the same Topic and these services 
> may be authored by different teams.  A single custom Partitioner that 
> encapsulates all Topic specific partitioning logic can be used by all teams 
> at all times for all Topics ensuring that mistakes are not made.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14703) Don't resign when failing to replay uncommitted records

2023-02-09 Thread Jira
José Armando García Sancio created KAFKA-14703:
--

 Summary: Don't resign when failing to replay uncommitted records
 Key: KAFKA-14703
 URL: https://issues.apache.org/jira/browse/KAFKA-14703
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Reporter: José Armando García Sancio


h1. Problem

The KRaft controller is replays both committed and uncommitted records. 
Committed records are replayed by the inactive controller. Uncommitted records 
are replayed by the active controller.

When handling an RPC the active controller generates a response and a list of 
uncommitted records. The active controller replays the uncommitted records 
before sending them to the KRaft layer for durability and replication. If the 
active controller encounters an error when replaying the uncommitted records, 
it calls the process exit fault handler.

Indirectly, the process exit fault handler resigns its KRaft leadership and 
closes all of the client connections.

Most clients to retry the RPC when they disconnect from the remote endpoint. If 
the RPC's replay error is deterministic then it is possible for the failure to 
propagate to all of the controllers as they become leaders. This handling may 
cause the controllers to become unavailable.
h1. Solution

We can avoid this failure from propagating to all of the controllers by 
changing how we handle errors when replaying uncommitted records. The active 
controller doesn't need to fatally exit, if it failed to replay an uncommitted 
record. The active controller should instead failed the RPC with an 
UNKNOWN_ERROR and revert the in-memory state to the in-memory snapshot before 
the RPC was handled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector

2023-02-09 Thread via GitHub


C0urante commented on code in PR #12366:
URL: https://github.com/apache/kafka/pull/12366#discussion_r1101986943


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -188,11 +196,52 @@ public ConfigDef config() {
 return MirrorSourceConfig.CONNECTOR_CONFIG_DEF;
 }
 
+@Override
+public org.apache.kafka.common.config.Config validate(Map 
props) {
+List configValues = 
MirrorSourceConfig.CONNECTOR_CONFIG_DEF.validate(props);
+if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) {
+if (!consumerUsesReadCommitted(props)) {
+ConfigValue exactlyOnceSupport = configValues.stream()
+.filter(cv -> 
EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
+configValues.add(result);
+return result;
+});
+// The Connect framework will already generate an error for 
this property if we return ExactlyOnceSupport.UNSUPPORTED
+// from our exactlyOnceSupport method, but it will be fairly 
generic
+// We add a second error message here to give users more 
insight into why this specific connector can't support exactly-once
+// guarantees with the given configuration
+exactlyOnceSupport.addErrorMessage(
+"MirrorSourceConnector can only provide exactly-once 
guarantees when its source consumer is configured with "
++ ConsumerConfig.ISOLATION_LEVEL_CONFIG + " 
set to '" + READ_COMMITTED + "'; "
++ "otherwise, records from aborted and 
uncommitted transactions will be replicated from the "
++ "source cluster to the target cluster."
+);
+}
+}
+return new org.apache.kafka.common.config.Config(configValues);
+}
+
 @Override
 public String version() {
 return AppInfoParser.getVersion();
 }
 
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map props) {
+return consumerUsesReadCommitted(props)
+? ExactlyOnceSupport.SUPPORTED
+: ExactlyOnceSupport.UNSUPPORTED;
+}
+
+private boolean consumerUsesReadCommitted(Map props) {
+MirrorSourceConfig config = new MirrorSourceConfig(props);

Review Comment:
   This turned out to be much trickier to address than expected, but I've made 
this method impervious to unrelated invalid properties.
   
   This required being able to determine the full config used for the source 
consumer without constructing a `MirrorSourceConfig` instance, which in turn 
required (or at least, was facilitated by) pulling out the 
find-all-entries-with-prefix logic in `AbstractConfig::originalsWithPrefix` 
into a static method in `Utils`.
   
   I couldn't think of a less invasive way to accomplish this, but it does seem 
worth the tradeoff; LMKWYT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector

2023-02-09 Thread via GitHub


C0urante commented on code in PR #12366:
URL: https://github.com/apache/kafka/pull/12366#discussion_r1101982892


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -188,11 +196,52 @@ public ConfigDef config() {
 return MirrorSourceConfig.CONNECTOR_CONFIG_DEF;
 }
 
+@Override
+public org.apache.kafka.common.config.Config validate(Map 
props) {
+List configValues = 
MirrorSourceConfig.CONNECTOR_CONFIG_DEF.validate(props);

Review Comment:
   Fine by me, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] littlehorse-eng commented on a diff in pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.

2023-02-09 Thread via GitHub


littlehorse-eng commented on code in PR #13082:
URL: https://github.com/apache/kafka/pull/13082#discussion_r1101981008


##
docs/streams/developer-guide/config-streams.html:
##
@@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas
   
 
-  The maximum number of warmup replicas (extra standbys beyond the 
configured num.standbys) that can be assigned at once for the purpose of keeping
-  the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
-  traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
-  for the reassigned warmups to restore sufficient state for them 
to be transitioned to active tasks. Must be at least 1.
+  
+The maximum number of warmup replicas (extra standbys beyond 
the configured num.standbys) that can be assigned at once for the purpose of 
keeping
+the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
+traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
+for the reassigned warmups to restore sufficient state for 
them to be transitioned to active tasks. Must be at least 1.
+  
+  
+Note that one warmup replica corresponds to one Stream Task. 
Furthermore, note that each warmup replica can only be promoted to an active 
Task during
+a rebalance (normally a Probing Rebalance, which occur at a 
frequency specified by the
+probing.rebalance.interval.ms config). This means 
that the
+maximum rate at which Stream Tasks can be migrated from 
over-burdened Streams Instances to fresher Streams Instances can be determined 
by
+(max.warmup.replicas /
+probing.rebalance.interval.ms). If it takes longer 
than the probing rebalance interval
+for the data for a Task to be migrated, then that rate will be 
lower.

Review Comment:
   Thanks for the review!
   
   What I meant here is that, if it takes a long time (eg. 60 minutes) for a 
Task to catch up, then decreasing the probing rebalance interval from, let's 
say, 30 minutes to 10 minutes won't have any effect on the speed at which you 
catch up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.

2023-02-09 Thread via GitHub


mjsax commented on code in PR #13082:
URL: https://github.com/apache/kafka/pull/13082#discussion_r1101930903


##
docs/streams/developer-guide/config-streams.html:
##
@@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas
   
 
-  The maximum number of warmup replicas (extra standbys beyond the 
configured num.standbys) that can be assigned at once for the purpose of keeping
-  the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
-  traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
-  for the reassigned warmups to restore sufficient state for them 
to be transitioned to active tasks. Must be at least 1.
+  
+The maximum number of warmup replicas (extra standbys beyond 
the configured num.standbys) that can be assigned at once for the purpose of 
keeping

Review Comment:
   ```suggestion
   The maximum number of warmup replicas (extra standbys beyond 
the configured num.standbys) that can be assigned at once for the 
purpose of keeping
   ```



##
docs/streams/developer-guide/config-streams.html:
##
@@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas
   
 
-  The maximum number of warmup replicas (extra standbys beyond the 
configured num.standbys) that can be assigned at once for the purpose of keeping
-  the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
-  traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
-  for the reassigned warmups to restore sufficient state for them 
to be transitioned to active tasks. Must be at least 1.
+  
+The maximum number of warmup replicas (extra standbys beyond 
the configured num.standbys) that can be assigned at once for the purpose of 
keeping
+the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
+traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
+for the reassigned warmups to restore sufficient state for 
them to be transitioned to active tasks. Must be at least 1.
+  
+  
+Note that one warmup replica corresponds to one Stream Task. 
Furthermore, note that each warmup replica can only be promoted to an active 
Task during

Review Comment:
   ```suggestion
   Note that one warmup replica corresponds to one Stream Task. 
Furthermore, note that each warmup task can only be promoted to an active task 
during
   ```



##
docs/streams/developer-guide/config-streams.html:
##
@@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas
   
 
-  The maximum number of warmup replicas (extra standbys beyond the 
configured num.standbys) that can be assigned at once for the purpose of keeping
-  the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
-  traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
-  for the reassigned warmups to restore sufficient state for them 
to be transitioned to active tasks. Must be at least 1.
+  
+The maximum number of warmup replicas (extra standbys beyond 
the configured num.standbys) that can be assigned at once for the purpose of 
keeping
+the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
+traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
+for the reassigned warmups to restore sufficient state for 
them to be transitioned to active tasks. Must be at least 1.
+  
+  
+Note that one warmup replica corresponds to one Stream Task. 
Furthermore, note that each warmup replica can only be promoted to an active 
Task during
+a rebalance (normally a Probing Rebalance, which occur at a 
frequency specified by the

Review Comment:
   ```suggestion
   a rebalance (normally a co-called probing rebalance, w

[GitHub] [kafka] cmccabe opened a new pull request, #13226: MINOR: Remove references to HIGHEST_SUPPORTED_VERSION from ZkMigrationClient

2023-02-09 Thread via GitHub


cmccabe opened a new pull request, #13226:
URL: https://github.com/apache/kafka/pull/13226

   Do not use HIGHEST_SUPPORTED_VERSION in ZkMigrationClient because
   it will do the wrong thing when more MV options are added in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

2023-02-09 Thread via GitHub


Hangleton commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1424741428

   Thank you for your review Frederico (@fvaleri). I pushed the changes to 
address all your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

2023-02-09 Thread via GitHub


Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1101958375


##
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard 
input.
+ */
+public class ConsoleProducer {
+public static void main(String[] args) {
+ConsoleProducer consoleProducer = new ConsoleProducer();
+consoleProducer.start(args);
+}
+
+void start(String[] args) {
+try {
+ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+MessageReader reader = createMessageReader(config);
+reader.init(System.in, config.getReaderProps());
+
+KafkaProducer producer = 
createKafkaProducer(config.getProducerProps());
+Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+ProducerRecord record;
+do {
+record = reader.readMessage();
+if (record != null) {
+send(producer, record, config.sync());
+}
+} while (record != null);
+
+} catch (OptionException e) {
+System.err.println(e.getMessage());
+Exit.exit(1);
+
+} catch (Exception e) {
+e.printStackTrace();
+Exit.exit(1);
+}
+Exit.exit(0);
+}
+
+// VisibleForTesting
+KafkaProducer createKafkaProducer(Properties props) {
+return new KafkaProducer<>(props);
+}
+
+// VisibleForTesting
+MessageReader createMessageReader(ConsoleProducerConfig config) throws 
ReflectiveOperationException {
+return (MessageReader) 
Class.forName(config.reader

[GitHub] [kafka] guozhangwang commented on pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


guozhangwang commented on PR #13202:
URL: https://github.com/apache/kafka/pull/13202#issuecomment-1424717333

   > @guozhangwang If you don't mind, I will do it as a follow-up. Created 
https://issues.apache.org/jira/browse/KAFKA-14702 to track this.
   
   Yeah totally, like I said it's "independent of this PR" :) Feel free to 
merge this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14701) More broker side partition assignor to common

2023-02-09 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14701:

Summary: More broker side partition assignor to common  (was: More broker 
side assignor to common)

> More broker side partition assignor to common
> -
>
> Key: KAFKA-14701
> URL: https://issues.apache.org/jira/browse/KAFKA-14701
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> Before releasing KIP-848, we need to move the server side partition assignor 
> to its final location in common.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14701) More broker side assignor to common

2023-02-09 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14701:

Description: Before releasing KIP-848, we need to move the server side 
partition assignor to its final location in common.

> More broker side assignor to common
> ---
>
> Key: KAFKA-14701
> URL: https://issues.apache.org/jira/browse/KAFKA-14701
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> Before releasing KIP-848, we need to move the server side partition assignor 
> to its final location in common.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


vcrfxia commented on PR #13189:
URL: https://github.com/apache/kafka/pull/13189#issuecomment-1424701063

   Thanks for your review, @mjsax ! Responded to your comments inline and also 
pushed a commit just now containing the requested changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


dajac commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101924823


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.

Review Comment:
   I think that I will keep the current name for now. I am not fully convinced 
by `GroupCoordinatorPartitionAssignor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101924540


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
 // VisibleForTesting
 void restoreBatch(final Collection> 
records) {
-throw new UnsupportedOperationException("not yet implemented");
+// advance stream time to the max timestamp in the batch
+for (final ConsumerRecord record : records) {
+observedStreamTime = Math.max(observedStreamTime, 
record.timestamp());
+}
+
+final VersionedStoreClient restoreClient = 
restoreWriteBuffer.getClient();

Review Comment:
   The VersionedStoreSegment implementation used by the restore client 
(WriteBufferSegmentWithDbFallback) is currently private to the write buffer 
class, since the RocksDBVersionedStore doesn't care what the type is; all the 
outer class needs are the methods provided by the RocksDBVersionedStoreClient 
interface itself. So I've left the type out in order to avoid polluting the 
outer class with extra info it doesn't need.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * 
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+// write buffer for latest value store. value type is Optional in order to 
track tombstones
+// which must be written to the underlying store.
+private final Map> latestValueWriteBuffer;
+// map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+// so getReverseSegments() is more efficient
+private final TreeMap 
segmentsWriteBuffer;
+private final BatchWritingVersionedStoreClient 
dbClient;
+private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+/**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying 
persistent store
+ */
+RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient dbClient) {
+this.dbClient = Objects.requireNonNull(dbClient);
+
+this.latestValueWriteBuffer = new HashMap<>();
+// store in reverse-sorted order, to make getReverseSegments() more 
efficient
+this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+}
+
+/**
+ * @return client for writing

[GitHub] [kafka] dajac commented on pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


dajac commented on PR #13202:
URL: https://github.com/apache/kafka/pull/13202#issuecomment-1424699307

   @guozhangwang If you don't mind, I will do it as a follow-up. Created 
https://issues.apache.org/jira/browse/KAFKA-14702 to track this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14701) More broker side assignor to common

2023-02-09 Thread David Jacot (Jira)
David Jacot created KAFKA-14701:
---

 Summary: More broker side assignor to common
 Key: KAFKA-14701
 URL: https://issues.apache.org/jira/browse/KAFKA-14701
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14702) Extend server side assignor to support rack aware replica placement

2023-02-09 Thread David Jacot (Jira)
David Jacot created KAFKA-14702:
---

 Summary: Extend server side assignor to support rack aware replica 
placement
 Key: KAFKA-14702
 URL: https://issues.apache.org/jira/browse/KAFKA-14702
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


dajac commented on PR #13202:
URL: https://github.com/apache/kafka/pull/13202#issuecomment-1424692244

   @guozhangwang That's a good point. We need to add the list of replica with 
their rack id to `AssignmentTopicMetadata`. That would replace the 
`numPartitions`. We missed this part in KIP-848 but we already added `rackId` 
to `AssignmentMemberSpec`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14700) Produce request interceptors

2023-02-09 Thread David Mariassy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mariassy updated KAFKA-14700:
---
Description: See 
[KIP-905|https://cwiki.apache.org/confluence/display/KAFKA/KIP-905%3A+Broker+interceptors]
  (was: See KIP-905)

> Produce request interceptors
> 
>
> Key: KAFKA-14700
> URL: https://issues.apache.org/jira/browse/KAFKA-14700
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: David Mariassy
>Assignee: David Mariassy
>Priority: Major
>
> See 
> [KIP-905|https://cwiki.apache.org/confluence/display/KAFKA/KIP-905%3A+Broker+interceptors]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101915894


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * 
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+// write buffer for latest value store. value type is Optional in order to 
track tombstones
+// which must be written to the underlying store.
+private final Map> latestValueWriteBuffer;
+// map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+// so getReverseSegments() is more efficient
+private final TreeMap 
segmentsWriteBuffer;
+private final BatchWritingVersionedStoreClient 
dbClient;
+private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+/**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying 
persistent store
+ */
+RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient dbClient) {
+this.dbClient = Objects.requireNonNull(dbClient);
+
+this.latestValueWriteBuffer = new HashMap<>();
+// store in reverse-sorted order, to make getReverseSegments() more 
efficient
+this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+}
+
+/**
+ * @return client for writing to (and reading from) the write buffer
+ */
+VersionedStoreClient getClient() {
+return restoreClient;
+}
+
+/**
+ * Flushes the contents of the write buffer into the persistent store, and 
clears the write
+ * buffer in the process.
+ * @throws RocksDBException if a failure occurs adding to or writing a 
{@link WriteBatch}
+ */
+void flush() throws RocksDBException {
+
+// flush segments first, as this is consistent with the store always 
writing to
+// older segments/stores before later ones
+try (final WriteBatch segmentsBatch = new WriteBatch()) {
+final List allSegments = 
restoreClient.getReverseSegments(Long.MIN_VALUE);
+if (allSegments.size() > 0) {
+// collect entries into write batch
+for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
+final LogicalKeyValueSegment dbSegment = 
bufferSegment.dbSegment();
+for (final Map.Entry segmentEntry : 
bufferSegment.getAll()) {
+dbSegment.add

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101914047


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -333,10 +384,34 @@ interface VersionedStoreSegment {
 long segmentIdForTimestamp(long timestamp);
 }
 
+/**
+ * A {@link VersionedStoreClient} which additionally supports batch writes 
into its latest
+ * value store.
+ *
+ * @param  the segment type used by this client
+ */
+interface BatchWritingVersionedStoreClient extends VersionedStoreClient {

Review Comment:
   We could, yeah. It just means we'd need to make RocksDBVersionedStoreClient 
non-private and require that RocksDBVersionedStoreRestoreWriteBuffer is 
specifically passed an instance of RocksDBVersionedStoreClient rather than any 
BatchWritingVersionedStoreClient. I'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-09 Thread via GitHub


vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101910583


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private stat

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


vcrfxia commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1101902156


##
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A key-value store that stores multiple record versions per key, and 
supports timestamp-based
+ * retrieval operations to return the latest record (per key) as of a 
specified timestamp.
+ * Only one record is stored per key and timestamp, i.e., a second call to
+ * {@link #put(Object, Object, long)} with the same key and timestamp will 
replace the first.
+ * 
+ * Each store instance has an associated, fixed-duration "history retention" 
which specifies
+ * how long old record versions should be kept for. In particular, a versioned 
store guarantees
+ * to return accurate results for calls to {@link #get(Object, long)} where 
the provided timestamp
+ * bound is within history retention of the current observed stream time. 
(Queries with timestamp
+ * bound older than the specified history retention are considered invalid.)
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public interface VersionedKeyValueStore extends StateStore {
+
+/**
+ * Add a new record version associated with this key.
+ *
+ * @param key   The key
+ * @param value The value, it can be {@code null};
+ *  if the serialized bytes are also {@code null} it is 
interpreted as a delete
+ * @param timestamp The timestamp for this record version
+ * @throws NullPointerException If {@code null} is used for key.
+ */
+void put(K key, V value, long timestamp);
+
+/**
+ * Delete the value associated with this key from the store, at the 
specified timestamp
+ * (if there is such a value), and return the deleted value.
+ * 
+ * This operation is semantically equivalent to {@link #get(Object, long)} 
#get(key, timestamp))}
+ * followed by {@link #put(Object, Object, long) #put(key, null, 
timestamp)}.
+ *
+ * @param key   The key
+ * @param timestamp The timestamp for this delete
+ * @return The value and timestamp of the latest record associated with 
this key
+ * as of the deletion timestamp (inclusive), or {@code null} if 
any of
+ * (1) the store contains no records for this key, (2) the latest 
record
+ * for this key as of the deletion timestamp is a tombstone, or
+ * (3) the deletion timestamp is older than this store's history 
retention
+ * (i.e., this store no longer contains data for the provided 
timestamp).
+ * @throws NullPointerException If {@code null} is used for key.
+ */
+VersionedRecord delete(K key, long timestamp);
+
+/**
+ * Get the latest (by timestamp) record associated with this key.
+ *
+ * @param key The key to fetch
+ * @return The value and timestamp of the latest record associated with 
this key, or
+ * {@code null} if either (1) the store contains no records for 
this key or (2) the
+ * latest record for this key is a tombstone.
+ * @throws NullPointerException   If null is used for key.
+ * @throws InvalidStateStoreException if the store is not initialized
+ */
+VersionedRecord get(K key);
+
+/**
+ * Get the latest record associated with this key with timestamp not 
exceeding the specified
+ * timestamp bound.

Review Comment:
   Hm. Currently "validFrom" and "validTo" are not mentioned anywhere in the 
public-facing interfaces (VersionedKeyValueStore, VersionedRecord, etc) in 
javadocs or as parameter names; they are only "officially" introduced as 
concepts in the javadocs for RocksDBVersionedStore and its implementation 
helpers. I'm a bit hesitant to introduce these concepts in the public-facing 
docs at this time if the only purpose is to clarify what's returned from 
`get(key, asOfTimestamp)`, particularly because during KIP discussion one 
p

[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


jolshan commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101900196


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.

Review Comment:
   Yeah I was just thinking about that, but didn't know if the name would be 
awkward. Thinking about it though, it might be better to clarify through naming 
to save headaches in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


jolshan commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101899467


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface PartitionAssignor {
+

Review Comment:
   Seems reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


vcrfxia commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1101897601


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+private final String name;
+private final long historyRetention;
+private final RocksDBMetricsRecorder metricsRecorder;
+
+private final RocksDBStore latestValueStore;
+private final LogicalKeyValueSegments segmentStores;
+private final LatestValueSchema latestValueSchema;
+private final SegmentValueSchema segmentValueSchema;
+private final

[GitHub] [kafka] lmr3796 commented on pull request #13187: MINOR: Log lastCaughtUpTime on ISR shrinkage

2023-02-09 Thread via GitHub


lmr3796 commented on PR #13187:
URL: https://github.com/apache/kafka/pull/13187#issuecomment-1424662401

   @clolov for visibility


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


dajac commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101889893


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.

Review Comment:
   That’s right. We could perhaps name this one 
GroupCoordinatorPartitionAssignor to make the difference clearer. Having the 
same name seems confusing. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


dajac commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101888535


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface PartitionAssignor {
+

Review Comment:
   Yeah. I started like this but I found it too heavy. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The assignment specification for a consumer group.
+ */
+public class AssignmentSpec {
+/**
+ * The members keyed by member id.
+ */
+final Map members;
+
+/**
+ * The topics' metadata keyed by topic id
+ */
+final Map topics;
+
+public AssignmentSpec(
+Map members,
+Map topics
+) {
+Objects.requireNonNull(members);
+Objects.requireNonNull(topics);
+

Review Comment:
   Will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

2023-02-09 Thread via GitHub


Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1101887714


##
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link 
InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link 
ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Apologies, that is right. Thanks for correcting. On that note, this 
interface moved from `kafka.common.MessageReader` to 
`org.apache.kafka.tools.MessageReader`. Any foreign implementation will have to 
be updated accordingly. What is the convention to follow is in this case - 
preserve the same package to avoid breaking dependencies or change the package 
to match the new structure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-09 Thread via GitHub


vcrfxia commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1101882173


##
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A key-value store that stores multiple record versions per key, and 
supports timestamp-based
+ * retrieval operations to return the latest record (per key) as of a 
specified timestamp.
+ * Only one record is stored per key and timestamp, i.e., a second call to
+ * {@link #put(Object, Object, long)} with the same key and timestamp will 
replace the first.
+ * 
+ * Each store instance has an associated, fixed-duration "history retention" 
which specifies
+ * how long old record versions should be kept for. In particular, a versioned 
store guarantees
+ * to return accurate results for calls to {@link #get(Object, long)} where 
the provided timestamp
+ * bound is within history retention of the current observed stream time. 
(Queries with timestamp
+ * bound older than the specified history retention are considered invalid.)
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public interface VersionedKeyValueStore extends StateStore {
+
+/**
+ * Add a new record version associated with this key.
+ *
+ * @param key   The key
+ * @param value The value, it can be {@code null};
+ *  if the serialized bytes are also {@code null} it is 
interpreted as a delete

Review Comment:
   Hm... I found this 
[comment](https://github.com/apache/kafka/blob/083e11a22ca9966ed010acdd5705351ab4300b52/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L87-L89)
 in the code the other day, which gives the impression that it is valid to 
serialize a non-null value to null bytes:
   > // Serializing non-null values to null can be useful when working with 
Optional-like values
   > // where the Optional.empty case is serialized to null.
   > // See the discussion here: https://github.com/apache/kafka/pull/7679
   
   Is this no longer true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14696) CVE-2023-25194: Apache Kafka: Possible RCE/Denial of service attack via SASL JAAS JndiLoginModule configuration using Kafka Connect

2023-02-09 Thread Manikumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686696#comment-17686696
 ] 

Manikumar commented on KAFKA-14696:
---

There are no plans to provide a patch for older versions (< 3.40)

We have given some advice as part CVE announcement: 
https://lists.apache.org/thread/vy1c7fqcdqvq5grcqp6q5jyyb302khyz

> CVE-2023-25194: Apache Kafka: Possible RCE/Denial of service attack via SASL 
> JAAS JndiLoginModule configuration using Kafka Connect
> ---
>
> Key: KAFKA-14696
> URL: https://issues.apache.org/jira/browse/KAFKA-14696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.8.1, 2.8.2
>Reporter: MillieZhang
>Priority: Major
> Fix For: 3.4.0
>
>
> CVE Reference: [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-34917]
>  
> Will Kafka 2.8.X provide a patch to fix this vulnerability?
> If yes, when will the patch be provided?
>  
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang merged pull request #13167: KAFKA-14650: Synchronize access to tasks inside task manager

2023-02-09 Thread via GitHub


guozhangwang merged PR #13167:
URL: https://github.com/apache/kafka/pull/13167


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14693) KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown

2023-02-09 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio reassigned KAFKA-14693:
--

Assignee: José Armando García Sancio

> KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown
> -
>
> Key: KAFKA-14693
> URL: https://issues.apache.org/jira/browse/KAFKA-14693
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.4.0
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Critical
> Fix For: 3.4.1
>
>
> h1. Problem
> When the kraft controller encounters an error that it cannot handle it calls 
> {{ProcessExitingFaultHandler}} which calls {{Exit.exit}} which calls 
> {{{}Runtime.exit{}}}.
> Based on the Runtime.exit documentation:
> {quote}All registered [shutdown 
> hooks|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang.Thread-],
>  if any, are started in some unspecified order and allowed to run 
> concurrently until they finish. Once this is done the virtual machine 
> [halts|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#halt-int-].
> {quote}
> One of the shutdown hooks registered by Kafka is {{{}Server.shutdown(){}}}. 
> This shutdown hook eventually calls {{{}KafkaEventQueue.close{}}}. This last 
> close method joins on the controller thread. Unfortunately, the controller 
> thread also joined waiting for the shutdown hook thread to finish.
> Here are an sample thread stacks:
> {code:java}
>"QuorumControllerEventHandler" #45 prio=5 os_prio=0 cpu=429352.87ms 
> elapsed=620807.49s allocated=38544M defined_classes=353 
> tid=0x7f5aeb31f800 nid=0x80c in Object.wait()  [0x7f5a658fb000]
>  java.lang.Thread.State: WAITING (on object monitor)  
>   
>   
>   at java.lang.Object.wait(java.base@17.0.5/Native 
> Method)
>   - waiting on 
>   at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304)
>   - locked <0xa29241f8> (a 
> org.apache.kafka.common.utils.KafkaThread)
>   at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372)
>   at 
> java.lang.ApplicationShutdownHooks.runHooks(java.base@17.0.5/ApplicationShutdownHooks.java:107)
>   at 
> java.lang.ApplicationShutdownHooks$1.run(java.base@17.0.5/ApplicationShutdownHooks.java:46)
>   at java.lang.Shutdown.runHooks(java.base@17.0.5/Shutdown.java:130)
>   at java.lang.Shutdown.exit(java.base@17.0.5/Shutdown.java:173)
>   - locked <0xffe020b8> (a java.lang.Class for 
> java.lang.Shutdown)
>   at java.lang.Runtime.exit(java.base@17.0.5/Runtime.java:115)
>   at java.lang.System.exit(java.base@17.0.5/System.java:1860)
>   at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43)
>   at org.apache.kafka.common.utils.Exit.exit(Exit.java:66)
>   at org.apache.kafka.common.utils.Exit.exit(Exit.java:62)
>   at 
> org.apache.kafka.server.fault.ProcessExitingFaultHandler.handleFault(ProcessExitingFaultHandler.java:54)
>   at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:891)
>   at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:874)
>   at 
> org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:969){code}
> and
> {code:java}
>   "kafka-shutdown-hook" #35 prio=5 os_prio=0 cpu=43.42ms elapsed=378593.04s 
> allocated=4732K defined_classes=74 tid=0x7f5a7c09d800 nid=0x4f37 in 
> Object.wait()  [0x7f5a47afd000]
>  java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(java.base@17.0.5/Native Method)
>   - waiting on 
>   at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304)
>   - locked <0xa272bcb0> (a 
> org.apache.kafka.common.utils.KafkaThread)
>   at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372)
>   at 
> org.apache.kafka.queue.KafkaEventQueue.close(KafkaEventQueue.java:509)
>   at 
> org.apache.kafka.controller.QuorumController.close(QuorumController.java:2553)
>   at 
> kafka.server.ControllerServer.shutdown(ControllerServer.scala:521)
>   at kafka.server.KafkaRaftServer.shutdown(KafkaRaftServer.scala:184)
>   at kafka.Kafka$.$anonfun$main$3(Kafka.scala:99)
>   at kafka.Kafka$$$Lambda$406/0x000800fb9730.apply$mcV$sp(Unknown 
> Source)
>   at kafk

[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-09 Thread via GitHub


junrao commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1101875350


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   Thanks, Satish. Agreed. Since this one will be implemented as async 
eventually, there is probably no need to set LogContext. We can keep this as 
static. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


jolshan commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101869241


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface PartitionAssignor {
+

Review Comment:
   I saw that we don't define the POJOs here in your comment -- to confirm, we 
no longer want to do this -- and its not something we plan to change in the 
future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


jolshan commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101869241


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface PartitionAssignor {
+

Review Comment:
   I saw that we don't define the POJOs here in your comment -- to confirm, we 
no longer want to do this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

2023-02-09 Thread via GitHub


mumrah merged PR #13183:
URL: https://github.com/apache/kafka/pull/13183


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


jolshan commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101863484


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.

Review Comment:
   The difference here is the packages they will be in?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


jolshan commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101861603


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupAssignment.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The partition assignment for a consumer group.
+ */
+public class GroupAssignment {
+/**
+ * The member assignments keyed by member id.
+ */
+final Map members;
+
+public GroupAssignment(
+Map members
+) {
+Objects.requireNonNull(members);
+this.members = members;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+GroupAssignment that = (GroupAssignment) o;
+
+return members.equals(that.members);

Review Comment:
   does this line just cover the above lines? Ditto for MemberAssignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

2023-02-09 Thread via GitHub


mumrah commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1101861663


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -274,11 +310,6 @@ public void run() throws Exception {
 new EventQueue.DeadlineFunction(deadline),
 new PollEvent());
 }
-

Review Comment:
   The base MigrationEvent will do the error logging. I was just removing some 
redundant overrides. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-09 Thread via GitHub


jolshan commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1101847493


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The assignment specification for a consumer group.
+ */
+public class AssignmentSpec {
+/**
+ * The members keyed by member id.
+ */
+final Map members;
+
+/**
+ * The topics' metadata keyed by topic id
+ */
+final Map topics;
+
+public AssignmentSpec(
+Map members,
+Map topics
+) {
+Objects.requireNonNull(members);
+Objects.requireNonNull(topics);
+

Review Comment:
   extremely small nit: any reason why we include a newline here but not on the 
other specs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-6793) Unnecessary warning log message

2023-02-09 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686677#comment-17686677
 ] 

Philip Nee commented on KAFKA-6793:
---

We're also observing the same issue.  

> Unnecessary warning log message 
> 
>
> Key: KAFKA-6793
> URL: https://issues.apache.org/jira/browse/KAFKA-6793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Assignee: Philip Nee
>Priority: Minor
>
> When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
> started to appear:
> level: WARN
> logger: org.apache.kafka.clients.consumer.ConsumerConfig
> message: The configuration 'admin.retries' was supplied but isn't a known 
> config.
> The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] akhileshchg commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

2023-02-09 Thread via GitHub


akhileshchg commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1101829449


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -274,11 +310,6 @@ public void run() throws Exception {
 new EventQueue.DeadlineFunction(deadline),
 new PollEvent());
 }
-

Review Comment:
   Why are we not handling exceptions in other events?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #13225: KAFKA-6793

2023-02-09 Thread via GitHub


philipnee opened a new pull request, #13225:
URL: https://github.com/apache/kafka/pull/13225

   Reviving this old discussion, apparently the PR was closed so I reopened 
another one.
   
   **Issue**: Recently we've encountered complaints about the warning being too 
noisy, therefore we want to reduce it to debug level.
   
   The original PR was closed in : 
https://issues.apache.org/jira/browse/KAFKA-6793
   
   **Questions**
   Is this log useful at all? Should we just remove it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dmariassy opened a new pull request, #13224: KAFKA-14700: Produce request interceptors

2023-02-09 Thread via GitHub


dmariassy opened a new pull request, #13224:
URL: https://github.com/apache/kafka/pull/13224

   First stab at implementing produce request interceptors as defined in KIP-905
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-6793) Unnecessary warning log message

2023-02-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-6793:
-

Assignee: Philip Nee

> Unnecessary warning log message 
> 
>
> Key: KAFKA-6793
> URL: https://issues.apache.org/jira/browse/KAFKA-6793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Assignee: Philip Nee
>Priority: Minor
>
> When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
> started to appear:
> level: WARN
> logger: org.apache.kafka.clients.consumer.ConsumerConfig
> message: The configuration 'admin.retries' was supplied but isn't a known 
> config.
> The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14700) Produce request interceptors

2023-02-09 Thread David Mariassy (Jira)
David Mariassy created KAFKA-14700:
--

 Summary: Produce request interceptors
 Key: KAFKA-14700
 URL: https://issues.apache.org/jira/browse/KAFKA-14700
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: David Mariassy
Assignee: David Mariassy


See KIP-905



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

2023-02-09 Thread via GitHub


fvaleri commented on PR #13214:
URL: https://github.com/apache/kafka/pull/13214#issuecomment-1424569047

   @showuon if you have some time, this looks almost ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

2023-02-09 Thread via GitHub


fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1101811769


##
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link 
InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link 
ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   This would be a breaking change, because there is an option that allows to 
provide your own message reader implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

2023-02-09 Thread via GitHub


fvaleri commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1101716594


##
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##
@@ -16,17 +16,29 @@
  */
 package org.apache.kafka.server.util;
 
+import joptsimple.OptionParser;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.PrintStream;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static java.util.Arrays.stream;
+
 public class ToolsUtils {
 
+public static void validatePortOrDie(OptionParser parser, String hostPort) 
{

Review Comment:
   ```suggestion
   public static void validatePortOrExit(OptionParser parser, String 
hostPort) {
   ```



##
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.server.util.CommandLineUtils.maybeMergeOptions;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+/**
+ * Sends {@link ProducerRecord} generated from lines read on the standard 
input.
+ */
+public class ConsoleProducer {
+public static void main(String[] args) {
+ConsoleProducer consoleProducer = new ConsoleProducer();
+consoleProducer.start(args);
+}
+
+void start(String[] args) {
+try {
+ConsoleProducerConfig config = new ConsoleProducerConfig(args);
+MessageReader reader = createMessageReader(config);
+reader.init(System.in, config.getReaderProps());
+
+KafkaProducer producer = 
createKafkaProducer(config.getProducerProps());
+Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+ProducerRecord record;
+do {
+record = reader.readMessag

[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-09 Thread via GitHub


C0urante commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1424539164

   @mimaison This is a moderately large change in behavior and if possible, 
it'd be nice to get another set of eyes on it before merging. We don't need 
another reviewer for the PR changes (although comments are always welcome); 
instead, I'd just like confirmation that this change is safe to make as a bug 
fix.
   
   TL;DR: If an upstream consumer group is ahead of the upstream offset for the 
latest-emitted checkpoint, we will only sync offsets for that consumer group to 
the downstream cluster based on the offset pair for that checkpoint, instead of 
adding the delta of (upstream offset for consumer group - upstream offset in 
checkpoint), since there is no guarantee that that delta will be accurate in 
cases where the upstream topic is compacted, has transaction markers, or has 
some records filtered out via SMT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-09 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1101777913


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.image.writer.ImageWriter;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.clients.admin.ScramMechanism;
+
+// import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+// import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents the SCRAM credentials in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ScramImage {
+public static final ScramImage EMPTY = new 
ScramImage(Collections.emptyMap());
+
+private final Map> 
mechanisms;
+
+public ScramImage(Map> 
mechanisms) {
+this.mechanisms = Collections.unmodifiableMap(mechanisms);
+}
+
+public void write(ImageWriter writer, ImageWriterOptions options) {
+for (Entry> 
mechanismEntry : mechanisms.entrySet()) {
+for (Entry userEntry : 
mechanismEntry.getValue().entrySet()) {
+writer.write(0, 
userEntry.getValue().toRecord(userEntry.getKey(), mechanismEntry.getKey()));
+}
+}
+}
+
+public Map> mechanisms() {
+return mechanisms;
+}
+
+public boolean isEmpty() {
+return mechanisms.isEmpty();
+}
+
+@Override
+public int hashCode() {
+return mechanisms.hashCode();
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null) return false;
+if (!o.getClass().equals(ScramImage.class)) return false;
+ScramImage other = (ScramImage) o;
+return mechanisms.equals(other.mechanisms);
+}
+
+@Override
+public String toString() {
+StringBuilder builder = new StringBuilder();
+builder.append("ScramImage(");
+List sortedMechanisms = 
mechanisms.keySet().stream().sorted().collect(Collectors.toList());
+String preMechanismComma = "";
+for (ScramMechanism mechanism : sortedMechanisms) {
+builder.append(preMechanismComma).append(mechanism).append(": {");
+Map userMap = 
mechanisms.get(mechanism);
+List sortedUserNames = 
userMap.keySet().stream().sorted().collect(Collectors.toList());
+String preUserNameComma = "";
+for (String userName : sortedUserNames) {
+
builder.append(preUserNameComma).append(userName).append("=").append(userMap.get(userName));

Review Comment:
   This one should b okay. It is just printing a list of usernames and which 
SCRAM mechanism they use. No passwords or salts displayed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-09 Thread via GitHub


omkreddy commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1101769724


##
metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND;
+import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL;
+import static 
org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM;
+
+
+/**
+ * Manages SCRAM credentials.
+ */
+public class ScramControlManager {
+static final int MAX_ITERATIONS = 16384;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+ScramControlManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+return new ScramControlManager(logContext,
+snapshotRegistry);
+}
+}
+
+static class ScramCredentialKey {
+private final String username;
+private final ScramMechanism mechanism;
+
+ScramCredentialKey(String username, ScramMechanism mechanism) {
+this.username = username;
+this.mechanism = mechanism;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(username, mechanism);
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null) return false;
+if (!(o.getClass() == this.getClass())) return false;
+ScramCredentialKey other = (ScramCredentialKey) o;
+return username.equals(other.username) &&
+mechanism.equals(other.mechanism);
+}
+
+@Override
+public String toString() {
+return "ScramCredentialKey" +
+"(username=" + username +
+", mechanism=" + mechanism +
+")";
+}
+}
+
+static class ScramCredentialValue {
+private final byte[] salt;
+private final byte[] saltedPassword;
+private final int iterations;
+
+ScramCredentialValue(
+byte[] salt,
+byte[] saltedPassword,
+int iterations
+) {
+this.salt = salt;
+this.saltedPassword = saltedPassword;
+this.iterations = iterations;
+}
+
+@Override
+publ

[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-09 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1101766963


##
metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND;
+import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL;
+import static 
org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM;
+
+
+/**
+ * Manages SCRAM credentials.
+ */
+public class ScramControlManager {
+static final int MAX_ITERATIONS = 16384;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+ScramControlManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+return new ScramControlManager(logContext,
+snapshotRegistry);
+}
+}
+
+static class ScramCredentialKey {
+private final String username;
+private final ScramMechanism mechanism;
+
+ScramCredentialKey(String username, ScramMechanism mechanism) {
+this.username = username;
+this.mechanism = mechanism;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(username, mechanism);
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null) return false;
+if (!(o.getClass() == this.getClass())) return false;
+ScramCredentialKey other = (ScramCredentialKey) o;
+return username.equals(other.username) &&
+mechanism.equals(other.mechanism);
+}
+
+@Override
+public String toString() {
+return "ScramCredentialKey" +
+"(username=" + username +
+", mechanism=" + mechanism +
+")";
+}
+}
+
+static class ScramCredentialValue {
+private final byte[] salt;
+private final byte[] saltedPassword;
+private final int iterations;
+
+ScramCredentialValue(
+byte[] salt,
+byte[] saltedPassword,
+int iterations
+) {
+this.salt = salt;
+this.saltedPassword = saltedPassword;
+this.iterations = iterations;
+}
+
+@Override
+p

[jira] [Commented] (KAFKA-14674) Backport fix for KAFKA-14455 to 3.3 and 3.4 branches

2023-02-09 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-14674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686648#comment-17686648
 ] 

Marcel Stör commented on KAFKA-14674:
-

I wasn't able to find any EOL information on the 2.x release line. Hence, I 
assume it _is_ EOL and this fix won't be backported to 2.8.2?

> Backport fix for KAFKA-14455 to 3.3 and 3.4 branches
> 
>
> Key: KAFKA-14674
> URL: https://issues.apache.org/jira/browse/KAFKA-14674
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.4.1, 3.3.3
>
>
> The 3.4 branch is currently frozen for the upcoming 3.4.0 release. Once the 
> release is complete, we can and should backport the change in 
> [https://github.com/apache/kafka/pull/12984] onto the 3.3 and 3.4 branches 
> before putting out any subsequent releases on those branches.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-09 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-14698:


Assignee: Akhilesh Chaganti

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Assignee: Akhilesh Chaganti
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2

2023-02-09 Thread via GitHub


C0urante commented on PR #13137:
URL: https://github.com/apache/kafka/pull/13137#issuecomment-1424428493

   Whoops--there was a typo in the title at the time of merge. It should refer 
to [KAFKA-10586](https://issues.apache.org/jira/browse/KAFKA-10586), not 
KAFKA-15086.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-09 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686623#comment-17686623
 ] 

David Arthur commented on KAFKA-14698:
--

Yea, I agree. Any ERROR is pretty concerning from an operator perspective. 

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-09 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686618#comment-17686618
 ] 

Mickael Maison commented on KAFKA-14698:


Yeah it seems it's just noise in the end but if you don't know I don't expect 
anyone to go forwards with the migration when both the controllers and brokers 
spam errors!

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante closed pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.

2023-02-09 Thread via GitHub


C0urante closed pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the 
update operation.
URL: https://github.com/apache/kafka/pull/8656


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.

2023-02-09 Thread via GitHub


C0urante commented on PR #8656:
URL: https://github.com/apache/kafka/pull/8656#issuecomment-1424416521

   Closing as this issue has been addressed by 
https://github.com/apache/kafka/pull/13137 / 
[KIP-710](https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-09 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686615#comment-17686615
 ] 

David Arthur commented on KAFKA-14698:
--

Thanks [~mimaison]. I've uploaded the logs from the zk migration system test 
which also shows this problem. If you continue to migrate ZK brokers, do you 
see other problems besides the ERROR logs? I suspect this is just a superficial 
problem and not a correctness issue.

cc [~showuon]

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-09 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14698:
-
Attachment: test_online_migration.tar.gz

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2023-02-09 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-9981.
--
Fix Version/s: 3.5.0
   Resolution: Fixed

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.0
>
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-09 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14698:
-
Fix Version/s: 3.5.0
   3.4.1

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13137: KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2

2023-02-09 Thread via GitHub


C0urante merged PR #13137:
URL: https://github.com/apache/kafka/pull/13137


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-02-09 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686614#comment-17686614
 ] 

Mickael Maison commented on KAFKA-14698:


Ignoring the errors and continuing the migration steps seems to work. Once I 
restarted the controllers without migration enabled, the errors stopped.

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Priority: Major
> Attachments: broker0.log, controller.log
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14697) KRaft controller should not send LISR to KRaft brokers during migration

2023-02-09 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686613#comment-17686613
 ] 

Mickael Maison commented on KAFKA-14697:


Ignoring the errors and continuing the migration steps seems to work. Once I 
restarted the controllers without migration enabled, the errors stopped.

> KRaft controller should not send LISR to KRaft brokers during migration
> ---
>
> Key: KAFKA-14697
> URL: https://issues.apache.org/jira/browse/KAFKA-14697
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Priority: Major
> Attachments: test_online_migration.tar.gz
>
>
> During a migration, after a broker is restarted in KRaft mode the KRaft 
> controller is still sending it LISR as if it were a ZK broker. This results 
> in errors like:
> {code}
> [2023-02-09 14:53:26,892] ERROR Closing socket for 
> 172.19.0.4:9092-172.19.0.9:32876-0 because of error (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> {code}
> This can be observed by running the 
> kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration
>  ducktape test. Attached are logs from a local run of this test with --debug 
> set.
> It seems these errors do not affect the consistency of metadata during the 
> migration, but the excessive LISR sent out will impact performance on large 
> clusters. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >