[jira] [Created] (KAFKA-14695) broker will get LEADER_AND_ISR is not enabled error while ZK migrating to KRaft

2023-02-08 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14695:
-

 Summary: broker will get LEADER_AND_ISR is not enabled error while 
ZK migrating to KRaft
 Key: KAFKA-14695
 URL: https://issues.apache.org/jira/browse/KAFKA-14695
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.4.0
Reporter: Luke Chen
Assignee: Luke Chen


Following the docs here:

[https://kafka.apache.org/documentation/#kraft_zk_migration]

 

During the step "Migrating brokers to KRaft", after migrating, the broker will 
get a lot of error:

 
{code:java}
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key LEADER_AND_ISR which is not enabled
[2023-02-08 12:06:25,776] ERROR Exception while processing request from 
192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) {code}
 

 

It blocks further migration.

This can be workaround by explicitly setting the listener host name. That is, 
instead of setting listeners value like this (also the default value)

_listeners=PLAINTEXT://:9092_  

It should be set as: _listeners=PLAINTEXT://localhost:9092_  

 

Will update the doc first, to unblock other users trying to migrate from ZK to 
Kraft, then investigating why this happened.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


satishd commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1101000469


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   This is always a static field unless it is loaded with LogContext.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


satishd commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100998652


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   No, this is always a static field unless it is loaded with `LogContext`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

2023-02-08 Thread via GitHub


yashmayya commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1099951075


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
 /**
  * Returns the initialized list of {@link Transformation} which are 
specified in {@link #TRANSFORMS_CONFIG}.
  */
-public > List> 
transformations() {
+public > List> 
transformations() {
 final List transformAliases = getList(TRANSFORMS_CONFIG);
 
-final List> transformations = new 
ArrayList<>(transformAliases.size());
+final List> transformations = new 
ArrayList<>(transformAliases.size());
 for (String alias : transformAliases) {
 final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
 try {
 @SuppressWarnings("unchecked")
 final Transformation transformation = 
Utils.newInstance(getClass(prefix + "type"), Transformation.class);
 Map configs = originalsWithPrefix(prefix);
-Object predicateAlias = 
configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-Object negate = 
configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+Object predicateAlias = 
configs.remove(TransformationStage.PREDICATE_CONFIG);
+Object negate = 
configs.remove(TransformationStage.NEGATE_CONFIG);
 transformation.configure(configs);
 if (predicateAlias != null) {

Review Comment:
   The default value for `PREDICATE_CONFIG` is `""` and not `null`; it looks 
like this check currently works fine because we're using the original configs 
and not the parsed configs. I wonder if we should update the default value and 
/ or this check here to be consistent?
   
   Edit: This isn't really related to your change; just thought I'd bring it up 
since we were in the area.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100936116


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * 
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+// write buffer for latest value store. value type is Optional in order to 
track tombstones
+// which must be written to the underlying store.
+private final Map> latestValueWriteBuffer;
+// map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+// so getReverseSegments() is more efficient
+private final TreeMap 
segmentsWriteBuffer;
+private final BatchWritingVersionedStoreClient 
dbClient;
+private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+/**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying 
persistent store
+ */
+RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient dbClient) {
+this.dbClient = Objects.requireNonNull(dbClient);
+
+this.latestValueWriteBuffer = new HashMap<>();
+// store in reverse-sorted order, to make getReverseSegments() more 
efficient
+this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+}
+
+/**
+ * @return client for writing to (and reading from) the write buffer
+ */
+VersionedStoreClient getClient() {

Review Comment:
   Why not typed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100927416


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
 // VisibleForTesting
 void restoreBatch(final Collection> 
records) {
-throw new UnsupportedOperationException("not yet implemented");
+// advance stream time to the max timestamp in the batch
+for (final ConsumerRecord record : records) {
+observedStreamTime = Math.max(observedStreamTime, 
record.timestamp());
+}
+
+final VersionedStoreClient restoreClient = 
restoreWriteBuffer.getClient();

Review Comment:
   Why is the generic not typed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920982


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * 
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+// write buffer for latest value store. value type is Optional in order to 
track tombstones
+// which must be written to the underlying store.
+private final Map> latestValueWriteBuffer;
+// map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+// so getReverseSegments() is more efficient
+private final TreeMap 
segmentsWriteBuffer;
+private final BatchWritingVersionedStoreClient 
dbClient;
+private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+/**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying 
persistent store
+ */
+RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient dbClient) {
+this.dbClient = Objects.requireNonNull(dbClient);
+
+this.latestValueWriteBuffer = new HashMap<>();
+// store in reverse-sorted order, to make getReverseSegments() more 
efficient
+this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+}
+
+/**
+ * @return client for writing to (and reading from) the write buffer
+ */
+VersionedStoreClient getClient() {
+return restoreClient;
+}
+
+/**
+ * Flushes the contents of the write buffer into the persistent store, and 
clears the write
+ * buffer in the process.
+ * @throws RocksDBException if a failure occurs adding to or writing a 
{@link WriteBatch}
+ */
+void flush() throws RocksDBException {
+
+// flush segments first, as this is consistent with the store always 
writing to
+// older segments/stores before later ones
+try (final WriteBatch segmentsBatch = new WriteBatch()) {
+final List allSegments = 
restoreClient.getReverseSegments(Long.MIN_VALUE);
+if (allSegments.size() > 0) {
+// collect entries into write batch
+for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
+final LogicalKeyValueSegment dbSegment = 
bufferSegment.dbSegment();
+for (final Map.Entry segmentEntry : 
bufferSegment.getAll()) {
+

[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920384


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * 
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+// write buffer for latest value store. value type is Optional in order to 
track tombstones
+// which must be written to the underlying store.
+private final Map> latestValueWriteBuffer;
+// map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+// so getReverseSegments() is more efficient
+private final TreeMap 
segmentsWriteBuffer;
+private final BatchWritingVersionedStoreClient 
dbClient;
+private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+/**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying 
persistent store
+ */
+RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient dbClient) {
+this.dbClient = Objects.requireNonNull(dbClient);
+
+this.latestValueWriteBuffer = new HashMap<>();
+// store in reverse-sorted order, to make getReverseSegments() more 
efficient
+this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+}
+
+/**
+ * @return client for writing to (and reading from) the write buffer
+ */
+VersionedStoreClient getClient() {
+return restoreClient;
+}
+
+/**
+ * Flushes the contents of the write buffer into the persistent store, and 
clears the write
+ * buffer in the process.
+ * @throws RocksDBException if a failure occurs adding to or writing a 
{@link WriteBatch}
+ */
+void flush() throws RocksDBException {
+
+// flush segments first, as this is consistent with the store always 
writing to
+// older segments/stores before later ones
+try (final WriteBatch segmentsBatch = new WriteBatch()) {
+final List allSegments = 
restoreClient.getReverseSegments(Long.MIN_VALUE);
+if (allSegments.size() > 0) {
+// collect entries into write batch
+for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
+final LogicalKeyValueSegment dbSegment = 
bufferSegment.dbSegment();
+for (final Map.Entry segmentEntry : 
bufferSegment.getAll()) {
+

[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920025


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * 
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+// write buffer for latest value store. value type is Optional in order to 
track tombstones
+// which must be written to the underlying store.
+private final Map> latestValueWriteBuffer;
+// map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+// so getReverseSegments() is more efficient
+private final TreeMap 
segmentsWriteBuffer;
+private final BatchWritingVersionedStoreClient 
dbClient;
+private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+/**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying 
persistent store
+ */
+RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient dbClient) {
+this.dbClient = Objects.requireNonNull(dbClient);
+
+this.latestValueWriteBuffer = new HashMap<>();
+// store in reverse-sorted order, to make getReverseSegments() more 
efficient
+this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+}
+
+/**
+ * @return client for writing to (and reading from) the write buffer
+ */
+VersionedStoreClient getClient() {
+return restoreClient;
+}
+
+/**
+ * Flushes the contents of the write buffer into the persistent store, and 
clears the write
+ * buffer in the process.
+ * @throws RocksDBException if a failure occurs adding to or writing a 
{@link WriteBatch}
+ */
+void flush() throws RocksDBException {
+
+// flush segments first, as this is consistent with the store always 
writing to
+// older segments/stores before later ones
+try (final WriteBatch segmentsBatch = new WriteBatch()) {
+final List allSegments = 
restoreClient.getReverseSegments(Long.MIN_VALUE);
+if (allSegments.size() > 0) {
+// collect entries into write batch
+for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
+final LogicalKeyValueSegment dbSegment = 
bufferSegment.dbSegment();
+for (final Map.Entry segmentEntry : 
bufferSegment.getAll()) {
+

[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100910560


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from 
its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write 
buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write 
efficiency during
+ * restoration.
+ * 
+ * The structure of the internals of this write buffer mirrors the structure 
of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store 
and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+private static final Logger log = 
LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+// write buffer for latest value store. value type is Optional in order to 
track tombstones
+// which must be written to the underlying store.
+private final Map> latestValueWriteBuffer;
+// map from segment id to write buffer. segments are stored in 
reverse-sorted order,
+// so getReverseSegments() is more efficient
+private final TreeMap 
segmentsWriteBuffer;
+private final BatchWritingVersionedStoreClient 
dbClient;
+private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+/**
+ * Creates a new write buffer.
+ * @param dbClient client for reading from and writing to the underlying 
persistent store
+ */
+RocksDBVersionedStoreRestoreWriteBuffer(final 
BatchWritingVersionedStoreClient dbClient) {
+this.dbClient = Objects.requireNonNull(dbClient);
+
+this.latestValueWriteBuffer = new HashMap<>();
+// store in reverse-sorted order, to make getReverseSegments() more 
efficient
+this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+}
+
+/**
+ * @return client for writing to (and reading from) the write buffer
+ */
+VersionedStoreClient getClient() {
+return restoreClient;
+}
+
+/**
+ * Flushes the contents of the write buffer into the persistent store, and 
clears the write
+ * buffer in the process.
+ * @throws RocksDBException if a failure occurs adding to or writing a 
{@link WriteBatch}
+ */
+void flush() throws RocksDBException {
+
+// flush segments first, as this is consistent with the store always 
writing to
+// older segments/stores before later ones
+try (final WriteBatch segmentsBatch = new WriteBatch()) {
+final List allSegments = 
restoreClient.getReverseSegments(Long.MIN_VALUE);
+if (allSegments.size() > 0) {
+// collect entries into write batch
+for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
+final LogicalKeyValueSegment dbSegment = 
bufferSegment.dbSegment();
+for (final Map.Entry segmentEntry : 
bufferSegment.getAll()) {
+

[GitHub] [kafka] showuon merged pull request #13221: MINOR: Fix hyperlink tags in upgrade docs

2023-02-08 Thread via GitHub


showuon merged PR #13221:
URL: https://github.com/apache/kafka/pull/13221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100908054


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -333,10 +384,34 @@ interface VersionedStoreSegment {
 long segmentIdForTimestamp(long timestamp);
 }
 
+/**
+ * A {@link VersionedStoreClient} which additionally supports batch writes 
into its latest
+ * value store.
+ *
+ * @param  the segment type used by this client
+ */
+interface BatchWritingVersionedStoreClient extends VersionedStoreClient {

Review Comment:
   Why do we need this interface? Can't we just add both methods to 
`RocksDBVersionedStoreClient` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100902144


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private 

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1100896272


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+private final String name;
+private final long historyRetention;
+private final RocksDBMetricsRecorder metricsRecorder;
+
+private final RocksDBStore latestValueStore;
+private final LogicalKeyValueSegments segmentStores;
+private final LatestValueSchema latestValueSchema;
+private final SegmentValueSchema segmentValueSchema;
+private final 

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1100893004


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+private final String name;
+private final long historyRetention;
+private final RocksDBMetricsRecorder metricsRecorder;
+
+private final RocksDBStore latestValueStore;
+private final LogicalKeyValueSegments segmentStores;
+private final LatestValueSchema latestValueSchema;
+private final SegmentValueSchema segmentValueSchema;
+private final 

[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1100892271


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+private final String name;
+private final long historyRetention;
+private final RocksDBMetricsRecorder metricsRecorder;
+
+private final RocksDBStore latestValueStore;
+private final LogicalKeyValueSegments segmentStores;
+private final LatestValueSchema latestValueSchema;
+private final SegmentValueSchema segmentValueSchema;
+private final 

[GitHub] [kafka] jsancio commented on a diff in pull request #13207: KAFKA-14664; Fix inaccurate raft idle ratio metric

2023-02-08 Thread via GitHub


jsancio commented on code in PR #13207:
URL: https://github.com/apache/kafka/pull/13207#discussion_r1100891108


##
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java:
##
@@ -133,26 +131,27 @@ public KafkaRaftMetrics(Metrics metrics, String 
metricGrpPrefix, QuorumState sta
 "The average number of records appended per sec as the leader 
of the raft quorum."),
 new Rate(TimeUnit.SECONDS, new WindowedSum()));
 
-this.pollIdleSensor = metrics.sensor("poll-idle-ratio");
-this.pollIdleSensor.add(metrics.metricName("poll-idle-ratio-avg",
+this.pollDurationSensor = metrics.sensor("poll-idle-ratio");
+this.pollDurationSensor.add(metrics.metricName(
+"poll-idle-ratio-avg",
 metricGroupName,
-"The average fraction of time the client's poll() is idle as 
opposed to waiting for the user code to process records."),
-new Avg());
+"The ratio of time the Raft IO thread is idle as opposed to " +
+"doing work (e.g. handling requests or replicating from 
the leader)"
+),
+new TimeRatio(1.0)
+);
 }
 
 public void updatePollStart(long currentTimeMs) {
-if (pollEndMs.isPresent() && pollStartMs.isPresent()) {
-long pollTimeMs = Math.max(pollEndMs.getAsLong() - 
pollStartMs.getAsLong(), 0L);
-long totalTimeMs = Math.max(currentTimeMs - 
pollStartMs.getAsLong(), 1L);
-this.pollIdleSensor.record(pollTimeMs / (double) totalTimeMs, 
currentTimeMs);
-}
-
 this.pollStartMs = OptionalLong.of(currentTimeMs);
-this.pollEndMs = OptionalLong.empty();
 }
 
 public void updatePollEnd(long currentTimeMs) {
-this.pollEndMs = OptionalLong.of(currentTimeMs);
+if (pollStartMs.isPresent()) {
+long pollDurationMs = Math.max(currentTimeMs - 
pollStartMs.getAsLong(), 0L);

Review Comment:
   Hmm. `KafkaRaftClient` uses `Time.milliseconds`. I think this is true if it 
used `Time.hiResClockMs`, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1100890258


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+private final String name;
+private final long historyRetention;
+private final RocksDBMetricsRecorder metricsRecorder;
+
+private final RocksDBStore latestValueStore;
+private final LogicalKeyValueSegments segmentStores;
+private final LatestValueSchema latestValueSchema;
+private final SegmentValueSchema segmentValueSchema;
+private final 

[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
*
* @param topicPartition topic partition
* @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or 
without error. It's mentioned in the documentation description following 
convention of the other handler functions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
*
* @param topicPartition topic partition
* @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or 
without error. It's mentioned in the documentation, but I'll update it to be 
more noticeable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
*
* @param topicPartition topic partition
* @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or 
without error. I'll update the documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100802687


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   i'll remove the static



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100799705


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only 
the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+private LeaderEndPoint leader;
+private ReplicaManager replicaMgr;
+private Integer fetchBackOffMs;

Review Comment:
   sorry, thought I had it removed in my previous commits, but I guess not. 
I'll pick it up in the next commits



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14694) RPCProducerIdManager should not wait for a new block

2023-02-08 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-14694:


 Summary: RPCProducerIdManager should not wait for a new block
 Key: KAFKA-14694
 URL: https://issues.apache.org/jira/browse/KAFKA-14694
 Project: Kafka
  Issue Type: Bug
Reporter: Jeff Kim
Assignee: Jeff Kim


RPCProducerIdManager initiates an async request to the controller to grab a 
block of producer IDs and then blocks waiting for a response from the 
controller.

This is done in the request handler threads while holding a global lock. This 
means that if many producers are requesting producer IDs and the controller is 
slow to respond, many threads can get stuck waiting for the lock.

This may also be a deadlock concern under the following scenario:

if the controller has 1 request handler thread (1 chosen for simplicity) and 
receives an InitProducerId request, it may deadlock.
basically any time the controller has N InitProducerId requests where N >= # of 
request handler threads has the potential to deadlock.

consider this:
1. the request handler thread tries to handle an InitProducerId request to the 
controller by forwarding an AllocateProducerIds request.
2. the request handler thread then waits on the controller response (timed poll 
on nextProducerIdBlock)
3. the controller's request handler threads need to pick this request up, and 
handle it, but the controller's request handler threads are blocked waiting for 
the forwarded AllocateProducerIds response.

 

We should not block while waiting for a new block and instead return 
immediately to free the request handler threads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100722811


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1529,6 +1539,11 @@ private void resetToEmptyState() {
  */
 private final ReplicationControlManager replicationControl;
 
+/**
+ * Manages SCRAM credentials, if there are any.
+ */
+private final ScramControlManager scramControl;

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100722578


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -53,7 +53,8 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, 
TopicPartition, Uuid}
-import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, 
ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, 
ProducerIdsImage, TopicsDelta, TopicsImage}
+import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, 
ConfigurationsImage, FeaturesImage,

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100722376


##
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala:
##
@@ -85,15 +113,17 @@ class AlterUserScramCredentialsRequestTest extends 
BaseRequestTest {
   .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(),
 )
 requests.foreach(request => {
-  val response = sendAlterUserScramCredentialsRequest(request)
+  val response = sendAlterUserScramCredentialsRequest(request, 
adminSocketServer)
   val results = response.data.results
   assertEquals(2, results.size)
   checkAllErrorsAlteringCredentials(results, Errors.DUPLICATE_RESOURCE, 
"when altering the same credential twice in a single request")
 })
   }
 
-  @Test
-  def testAlterEmptyUser(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testAlterEmptyUser(quorum: String): Unit = {
+  println("Starting test")

Review Comment:
   Fix
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-08 Thread via GitHub


jolshan commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1100705771


##
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##
@@ -212,24 +248,32 @@ public static ApiVersionCollection 
collectApis(Set apiKeys) {
  * @param listenerType the listener type which constrains the set of 
exposed APIs
  * @param minRecordVersion min inter broker magic
  * @param activeControllerApiVersions controller ApiVersions
+ * @param enableUnstableLastVersion whether unstable versions should be 
advertised or not

Review Comment:
   Do we have a place where all this new unstable api usage is documented? It 
could be a bit confusing for folks who didn't take a look at this PR  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100690761


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() 
throws InterruptedExceptio
 Map translatedOffsets = 
backupClient.remoteConsumerOffsets(
 consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
 return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
-   translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));

Review Comment:
   Hmm... if it's only a matter of reading to the end of the topic and then 
committing offsets, I think I'd prefer to have more coverage in this test. We 
do have assertions for regular checkpointing/syncing logic in other tests, but 
AFAICT we don't have anything to explicitly test the transition for a single 
consumer group from not being synced (even though other groups are being 
synced) to being synced.
   
   If we do have coverage for that somewhere else, then let me know and we can 
resolve this comment with no further action.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13659) MM2 should read all offset syncs at start up

2023-02-08 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686112#comment-17686112
 ] 

Chris Egerton commented on KAFKA-13659:
---

[~dorwi] thanks for filing this issue. I've removed part of the description 
that refers to syncing consumer offsets on the downstream cluster with offsets 
beyond the end of the log, since that's already captured with KAFKA-12468 and a 
corresponding [PR for that issue|https://github.com/apache/kafka/pull/13178].

 

The remaining part of this ticket is still worth pursuing, and I'd be 
interested in seeing if we can merge a fix for it. Are you still interested in 
pursuing this ticket, or would it be okay to reassign to someone else?

> MM2 should read all offset syncs at start up
> 
>
> Key: KAFKA-13659
> URL: https://issues.apache.org/jira/browse/KAFKA-13659
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Kanalas Vidor
>Assignee: Kanalas Vidor
>Priority: Major
>
> MirrorCheckpointTask uses OffsetSyncStore, and does not check whether 
> OffsetSyncStore managed to read to the "end" of the offset-syncs topic. 
> OffsetSyncStore should fetch the endoffset of the topic at startup, and set a 
> flag when it finally reaches the endoffset in consumption. 
> MirrorCheckpointTask.poll should wait for this flag to be true before doing 
> any in-memory updates and group offset management.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13659) MM2 should read all offset syncs at start up

2023-02-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13659:
--
Summary: MM2 should read all offset syncs at start up  (was: MM2 should 
read all offset syncs at start up and should not set consumer offset higher 
than the end offset)

> MM2 should read all offset syncs at start up
> 
>
> Key: KAFKA-13659
> URL: https://issues.apache.org/jira/browse/KAFKA-13659
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Kanalas Vidor
>Assignee: Kanalas Vidor
>Priority: Major
>
> MirrorCheckpointTask uses OffsetSyncStore, and does not check whether 
> OffsetSyncStore managed to read to the "end" of the offset-syncs topic. 
> OffsetSyncStore should fetch the endoffset of the topic at startup, and set a 
> flag when it finally reaches the endoffset in consumption. 
> MirrorCheckpointTask.poll should wait for this flag to be true before doing 
> any in-memory updates and group offset management.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13659) MM2 should read all offset syncs at start up and should not set consumer offset higher than the end offset

2023-02-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13659:
--
Description: MirrorCheckpointTask uses OffsetSyncStore, and does not check 
whether OffsetSyncStore managed to read to the "end" of the offset-syncs topic. 
OffsetSyncStore should fetch the endoffset of the topic at startup, and set a 
flag when it finally reaches the endoffset in consumption. 
MirrorCheckpointTask.poll should wait for this flag to be true before doing any 
in-memory updates and group offset management.  (was: - MirrorCheckpointTask 
uses OffsetSyncStore, and does not check whether OffsetSyncStore managed to 
read to the "end" of the offset-syncs topic. OffsetSyncStore should fetch the 
endoffset of the topic at startup, and set a flag when it finally reaches the 
endoffset in consumption. MirrorCheckpointTask.poll should wait for this flag 
to be true before doing any in-memory updates and group offset management.
 - MirrorCheckpointTask can create checkpoints which point into the "future" - 
meaning it sometimes translates consumer offsets in a way that the target 
offset is greater than the endoffset of the replica topic partition. 
MirrorCheckpointTask should fetch the endoffsets of the affected topics, and 
make sure that it does not try to set the consumer offset to anything higher 
than the endoffset.)

> MM2 should read all offset syncs at start up and should not set consumer 
> offset higher than the end offset
> --
>
> Key: KAFKA-13659
> URL: https://issues.apache.org/jira/browse/KAFKA-13659
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Kanalas Vidor
>Assignee: Kanalas Vidor
>Priority: Major
>
> MirrorCheckpointTask uses OffsetSyncStore, and does not check whether 
> OffsetSyncStore managed to read to the "end" of the offset-syncs topic. 
> OffsetSyncStore should fetch the endoffset of the topic at startup, and set a 
> flag when it finally reaches the endoffset in consumption. 
> MirrorCheckpointTask.poll should wait for this flag to be true before doing 
> any in-memory updates and group offset management.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100680887


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
 
 produceMessages(primary, "test-topic-1");
 
-ReplicationPolicy replicationPolicy = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-String remoteTopic = 
replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-// Check offsets are pushed to the checkpoint topic
-Consumer backupConsumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
-waitForCondition(() -> {
-ConsumerRecords records = 
backupConsumer.poll(Duration.ofSeconds(1L));
-for (ConsumerRecord record : records) {
-Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-return true;
-}
-}
-return false;
-}, 30_000,
-"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + 
".test-topic-1"
-);

Review Comment:
   I think that makes sense.
   I'll add some explicitly committed offsets here as a quick fix, and re-add 
the assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100676199


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
 
 produceMessages(primary, "test-topic-1");
 
-ReplicationPolicy replicationPolicy = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-String remoteTopic = 
replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-// Check offsets are pushed to the checkpoint topic
-Consumer backupConsumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
-waitForCondition(() -> {
-ConsumerRecords records = 
backupConsumer.poll(Duration.ofSeconds(1L));
-for (ConsumerRecord record : records) {
-Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-return true;
-}
-}
-return false;
-}, 30_000,
-"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + 
".test-topic-1"
-);

Review Comment:
   Thanks for the explanation, that clears things up nicely.
   
   > Upon further inspection, maybe this test makes sense to turn into a 
parameter for other tests, to verify that the functionality of the checkpoints 
when the syncs topic is moved around. Does that make sense to do in this PR?
   
   I'm hesitant to drop this coverage from the test, because it seems at least 
as useful to verify that offset sync records are published in the expected 
location (and translated to consumer offsets) as it is to verify that they 
aren't published in the incorrect location. My personal preference would be to 
add the minimal additional logic necessary to this test to keep that coverage, 
and then pursue parameterization of offset sync location out of band. I'm also 
not even sure that parameterization would be worth the tradeoff in build/test 
time, although we might be able to find a healthy compromise there.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1423233290

   > Oh, and the Jenkins build seems to be consistently failing on the 
IdentityReplicationIntegrationTest::testNoCheckpointsIfNoRecordsAreMirrored 
test case. Probably want to look into that before we merge 
   
   It looks like this was failing due to a typo in my offset.flush.interval.ms 
overrides. This should be fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100667177


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -469,7 +461,7 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
 
 // create a consumer at primary cluster to consume the new topic
 try (Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "test-topic-2")) {

Review Comment:
   Ah, now I see (was confusing the total end offsets with the total consumer 
group offsets). Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100654263


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition 
sourceTopicPartition, long upstr
 // Offset is too far in the past to translate accurately
 return OptionalLong.of(-1L);
 }
-long upstreamStep = upstreamOffset - 
offsetSync.get().upstreamOffset();
-return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
+return OptionalLong.of(offsetSync.get().downstreamOffset() + 
(offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   IMO the diagram is a bit overkill, but I tend not to be a very visual 
thinker. It certainly doesn't do any harm; I'm fine with leaving it in. Thanks 
for adding detail here, hopefully it'll come in handy if anyone needs to 
revisit this logic in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14674) Backport fix for KAFKA-14455 to 3.3 and 3.4 branches

2023-02-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14674.
---
Resolution: Fixed

Backported to 
[3.3|https://github.com/apache/kafka/commit/2aeceff25a4a6a0581adf779b77cd94838b4e224]
 and 
[3.4|https://github.com/apache/kafka/commit/e35ade9fb9d34f75b102ea2371db4d137866d464].

> Backport fix for KAFKA-14455 to 3.3 and 3.4 branches
> 
>
> Key: KAFKA-14674
> URL: https://issues.apache.org/jira/browse/KAFKA-14674
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.4.1, 3.3.3
>
>
> The 3.4 branch is currently frozen for the upcoming 3.4.0 release. Once the 
> release is complete, we can and should backport the change in 
> [https://github.com/apache/kafka/pull/12984] onto the 3.3 and 3.4 branches 
> before putting out any subsequent releases on those branches.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-02-08 Thread via GitHub


C0urante commented on PR #12984:
URL: https://github.com/apache/kafka/pull/12984#issuecomment-1423192295

   Finished backporting to 3.3 and 3.4, now that both active releases on those 
branches have concluded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1423176443

   > I notice that we don't do a read-to-end of the offset syncs topic in 
MirrorCheckpointTask before we begin syncing consumer group offsets, and we 
begin reading that topic from the beginning. This may cause us to sync offsets 
based on stale checkpoints if there are later checkpoints available in the 
topic that we haven't consumed yet. Do you think it might make sense to add a 
read-to-end for the offset syncs topic before we begin syncing consumer group 
offsets in the checkpoint connector?
   
   Ouch, yeah that is certainly an issue that gets worse with my change.
   Before the checkpoints would be non-monotonic for transactional/compacted 
topics, and after it's non-monotonic for everyone.
   I think addressing this in a follow-up is a smart idea, this change is 
already messy enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100622077


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition 
sourceTopicPartition, long upstr
 // Offset is too far in the past to translate accurately
 return OptionalLong.of(-1L);
 }
-long upstreamStep = upstreamOffset - 
offsetSync.get().upstreamOffset();
-return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
+return OptionalLong.of(offsetSync.get().downstreamOffset() + 
(offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   Let me know if this description makes sense to you, and if the diagram adds 
any value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100550240


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
 int cnt = 0;
 for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
 for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster, topicName, p, "key", "value-" + cnt++);
 }
-
+
+protected void produce(EmbeddedConnectCluster cluster, String topic, 
Integer partition, String key, String value) {
+cluster.kafka().produce(topic, partition, key, value);
+}
+
+protected static Map 
waitForAnyCheckpoint(

Review Comment:
   It's `any` in contrast to the other method's `FullSync`.
   As in, it accepts checkpoints anywhere within a topic, not just at the end 
of the topic.
   
   I'll update the method name to be more precise and verbose :)



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
 // have been automatically synchronized from primary to backup by the 
background job, so no
 // more records to consume from the replicated topic by the same 
consumer group at backup cluster
 assertEquals(0, records.count(), "consumer record size is not zero");
+backupConsumer.close();

Review Comment:
   I code-golfed this one, but there's no reason to. Updated.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() 
throws InterruptedExceptio
 Map translatedOffsets = 
backupClient.remoteConsumerOffsets(
 consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
 return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
-   translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));

Review Comment:
   1. This was passing before, because `offset.max.lag` was nonzero, leaving a 
(0,0) offset sync in the topic.
   2. This condition is part of 
`waitForAnyCheckpoint/waitForCheckpointOnAllPartitions` which is used in 
`testReplication`. I think that test should be sufficient to cover the normal 
case where offsets are translated.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
 
 produceMessages(primary, "test-topic-1");
 
-ReplicationPolicy replicationPolicy = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-String remoteTopic = 
replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-// Check offsets are pushed to the checkpoint topic
-Consumer backupConsumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
-waitForCondition(() -> {
-ConsumerRecords records = 
backupConsumer.poll(Duration.ofSeconds(1L));
-for (ConsumerRecord record : records) {
-Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-return true;
-}
-}
-return false;
-}, 30_000,
-"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + 
".test-topic-1"
-);

Review Comment:
   This assertion relied on a nonzero `offset.lag.max`, where the sync topic 
could be left with a (0,0) record.
   The offset being translated in this case is the `consumer-group-dummy` from 
the setup method, which has offset 0 (at the beginning of the topic). Now that 
`offset.lag.max` is zero and the starvation bug is fixed, the sync topic may 
only see nonzero syncs, which are not able to translate 0 offsets.
   
   I removed this assertion because it was failing and didn't seem to concern 
this test.
   Upon further inspection, maybe this test makes sense to turn into a 
parameter for other tests, to verify that the functionality of the checkpoints 
when the syncs topic is moved around. Does that make sense to do in this PR?



##

[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-02-08 Thread via GitHub


C0urante commented on PR #13137:
URL: https://github.com/apache/kafka/pull/13137#issuecomment-1423159660

   Thanks Mickael, I've addressed your suggestions and rebased.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


junrao commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100515746


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   This seems a bit weird. The log typically is an instance level object.



##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


divijvaidya commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100593799


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -238,7 +238,7 @@ public KafkaClusterTestKit build() throws Exception {
 bootstrapMetadata);
 } catch (Throwable e) {
 log.error("Error creating controller {}", node.id(), 
e);
-Utils.swallow(log, "sharedServer.stopForController", 
() -> sharedServer.stopForController());
+Utils.swallow(log, "sharedServer.stopForController", 
() -> sharedServer.stopForController(), null);

Review Comment:
   ah! intended to do that. Have done it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


divijvaidya commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100593578


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -998,15 +998,18 @@ public static void closeAll(Closeable... closeables) 
throws IOException {
 throw exception;
 }
 
-public static void swallow(
-Logger log,
-String what,
-Runnable runnable
-) {
-try {
-runnable.run();
-} catch (Throwable e) {
-log.warn("{} error", what, e);
+/**
+ * Run the supplied code. If an exception is thrown, it is swallowed and 
registered to the firstException parameter.
+ */
+public static void swallow(Logger log, String what, final Runnable code, 
final AtomicReference firstException) {
+if (code != null) {
+try {
+code.run();
+} catch (Throwable t) {
+log.warn("{} error", what, t);

Review Comment:
   Fair point. Though not all "swallows" will require error level. So, I 
created a generic logging function based on `CoreUtils.swallow` and then used 
error for this specific instance of closing fetcher and coordinator. I have 
also updated the comment.
   
   Let me know if that look right? Happy to change it further.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


divijvaidya commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100592340


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -2455,7 +2454,7 @@ private void close(Duration timeout, boolean 
swallowException) {
 closeTimer.reset(remainingDurationInTimeout);
 
 // This is a blocking call bound by the time remaining in 
closeTimer
-LambdaUtils.swallow(() -> fetcher.close(closeTimer), 
firstException);
+Utils.swallow(log, " fetcher close", () -> 
fetcher.close(closeTimer), firstException);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100587361


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+@Context
+private UriInfo uriInfo;
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalMirrorResource.class);
+
+private final Map herders;
+
+public InternalMirrorResource(Map herders, 
RestClient restClient) {
+super(restClient);
+this.herders = herders;
+}
+
+@Override
+protected Herder herderForRequest() {
+String source = pathParam("source");
+String target = pathParam("target");
+Herder result = herders.get(new SourceAndTarget(source, target));
+if (result == null) {
+log.debug("Failed to find herder for source '{}' and target '{}'", 
source, target);

Review Comment:
   I was under the impression that throwing this would only cause the exception 
message to be present in the REST response (and presumably the logs for the 
worker that issued the request), but after revisiting the 
`ConnectExceptionMapper` class, it appears that we'll also log the exception at 
debug level on this worker. So yep, we can remove this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100585212


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+@Context
+private UriInfo uriInfo;
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalMirrorResource.class);
+
+private final Map herders;
+
+public InternalMirrorResource(Map herders, 
RestClient restClient) {
+super(restClient);
+this.herders = herders;
+}
+
+@Override
+protected Herder herderForRequest() {
+String source = pathParam("source");
+String target = pathParam("target");
+Herder result = herders.get(new SourceAndTarget(source, target));
+if (result == null) {
+log.debug("Failed to find herder for source '{}' and target '{}'", 
source, target);
+throw new NotFoundException("No replication flow found for source 
'" + source + "' and target '" + target + "'");
+}
+return result;
+}
+
+private String pathParam(String name) {
+List result = uriInfo.getPathParameters().get(name);
+if (result == null || result.isEmpty())
+throw new NotFoundException();

Review Comment:
   Yeah, couldn't hurt. This is almost guaranteed to end up in a log file on 
another worker so in the unlikely event that this code path is somehow reached, 
more detail would be preferable to less.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100580647


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.image.writer.ImageWriter;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.clients.admin.ScramMechanism;
+
+// import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+// import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents the SCRAM credentials in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ScramImage {
+public static final ScramImage EMPTY = new 
ScramImage(Collections.emptyMap());
+
+private final Map> 
mechanisms;
+
+public ScramImage(Map> 
mechanisms) {
+this.mechanisms = Collections.unmodifiableMap(mechanisms);
+}
+
+public void write(ImageWriter writer, ImageWriterOptions options) {
+for (Entry> 
mechanismEntry : mechanisms.entrySet()) {
+for (Entry userEntry : 
mechanismEntry.getValue().entrySet()) {
+writer.write(0, 
userEntry.getValue().toRecord(userEntry.getKey(), mechanismEntry.getKey()));
+}
+}
+}
+
+public Map> mechanisms() {
+return mechanisms;
+}
+
+public boolean isEmpty() {
+return mechanisms.isEmpty();
+}
+
+@Override
+public int hashCode() {
+return mechanisms.hashCode();
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null) return false;
+if (!o.getClass().equals(ScramImage.class)) return false;
+ScramImage other = (ScramImage) o;
+return mechanisms.equals(other.mechanisms);
+}
+
+@Override
+public String toString() {
+StringBuilder builder = new StringBuilder();
+builder.append("ScramImage(");
+List sortedMechanisms = 
mechanisms.keySet().stream().sorted().collect(Collectors.toList());
+String preMechanismComma = "";
+for (ScramMechanism mechanism : sortedMechanisms) {
+builder.append(preMechanismComma).append(mechanism).append(": {");
+Map userMap = 
mechanisms.get(mechanism);
+List sortedUserNames = 
userMap.keySet().stream().sorted().collect(Collectors.toList());
+String preUserNameComma = "";
+for (String userName : sortedUserNames) {
+
builder.append(preUserNameComma).append(userName).append("=").append(userMap.get(userName));

Review Comment:
   As far as I can tell, the toString method is never used.
   I think it is useful for debugging, but I'd like to know how we handle this 
in general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100579983


##
metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.utils.Bytes;
+
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+/**
+ * Represents the ACLs in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ScramCredentialData {
+private final byte[] salt;
+private final byte[] saltedPassword;
+private final int iterations;
+
+static ScramCredentialData fromRecord(
+UserScramCredentialRecord record
+) {
+return new ScramCredentialData(
+record.salt(),
+record.saltedPassword(),
+record.iterations());
+}
+
+public ScramCredentialData(
+byte[] salt,
+byte[] saltedPassword,
+int iterations
+) {
+this.salt = salt;
+this.saltedPassword = saltedPassword;
+this.iterations = iterations;
+}
+
+public byte[] salt() {
+return salt;
+}
+
+public byte[] saltedPassword() {
+return saltedPassword;
+}
+
+public int iterations() {
+return iterations;
+}
+
+public UserScramCredentialRecord toRecord(
+String userName,
+ScramMechanism mechanism
+) {
+return new UserScramCredentialRecord().
+setName(userName).
+setMechanism(mechanism.type()).
+setSalt(salt).
+setSaltedPassword(saltedPassword).
+setIterations(iterations);
+}
+
+public ScramCredential toCredential(
+ScramMechanism mechanism
+) throws GeneralSecurityException {
+org.apache.kafka.common.security.scram.internals.ScramMechanism 
internalMechanism =
+
org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName());
+ScramFormatter formatter = new ScramFormatter(internalMechanism);
+return new ScramCredential(salt,
+formatter.storedKey(formatter.clientKey(saltedPassword)),
+formatter.serverKey(saltedPassword),
+iterations);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(salt, saltedPassword, iterations);
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null) return false;
+if (!o.getClass().equals(ScramCredentialData.class)) return false;
+ScramCredentialData other = (ScramCredentialData) o;
+return Arrays.equals(salt, other.salt) &&
+Arrays.equals(saltedPassword, other.saltedPassword) &&
+iterations == other.iterations;
+}
+
+@Override
+public String toString() {
+return "ScramCredentialData" +

Review Comment:
   As far as I can tell, the toString method is never used.
   I think it is useful for debugging, but I'd like to know how we handle this 
in general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100576351


##
metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND;
+import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL;
+import static 
org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM;
+
+
+/**
+ * Manages SCRAM credentials.
+ */
+public class ScramControlManager {
+static final int MAX_ITERATIONS = 16384;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+ScramControlManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+return new ScramControlManager(logContext,
+snapshotRegistry);
+}
+}
+
+static class ScramCredentialKey {
+private final String username;
+private final ScramMechanism mechanism;
+
+ScramCredentialKey(String username, ScramMechanism mechanism) {
+this.username = username;
+this.mechanism = mechanism;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(username, mechanism);
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null) return false;
+if (!(o.getClass() == this.getClass())) return false;
+ScramCredentialKey other = (ScramCredentialKey) o;
+return username.equals(other.username) &&
+mechanism.equals(other.mechanism);
+}
+
+@Override
+public String toString() {
+return "ScramCredentialKey" +
+"(username=" + username +
+", mechanism=" + mechanism +
+")";
+}
+}
+
+static class ScramCredentialValue {
+private final byte[] salt;
+private final byte[] saltedPassword;
+private final int iterations;
+
+ScramCredentialValue(
+byte[] salt,
+byte[] saltedPassword,
+int iterations
+) {
+this.salt = salt;
+this.saltedPassword = saltedPassword;
+this.iterations = iterations;
+}
+
+@Override
+

[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100573646


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -221,6 +223,21 @@ class BrokerMetadataPublisher(
   s"quotas in ${deltaName}", t)
   }
 
+  // Apply changes to SCRAM credentials.
+  Option(delta.scramDelta()).foreach { scramDelta =>

Review Comment:
   The credentials are added to the metadata log and then consumed by the 
brokers and added to the broker cache.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14693) KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown

2023-02-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14693:
---
Description: 
h1. Problem

When the kraft controller encounters an error that it cannot handle it calls 
{{ProcessExitingFaultHandler}} which calls {{Exit.exit}} which calls 
{{{}Runtime.exit{}}}.

Based on the Runtime.exit documentation:
{quote}All registered [shutdown 
hooks|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang.Thread-],
 if any, are started in some unspecified order and allowed to run concurrently 
until they finish. Once this is done the virtual machine 
[halts|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#halt-int-].
{quote}
One of the shutdown hooks registered by Kafka is {{{}Server.shutdown(){}}}. 
This shutdown hook eventually calls {{{}KafkaEventQueue.close{}}}. This last 
close method joins on the controller thread. Unfortunately, the controller 
thread also joined waiting for the shutdown hook thread to finish.

Here are an sample thread stacks:
{code:java}
   "QuorumControllerEventHandler" #45 prio=5 os_prio=0 cpu=429352.87ms 
elapsed=620807.49s allocated=38544M defined_classes=353 tid=0x7f5aeb31f800 
nid=0x80c in Object.wait()  [0x7f5a658fb000]
 java.lang.Thread.State: WAITING (on object monitor)


at java.lang.Object.wait(java.base@17.0.5/Native Method)
  - waiting on 
  at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304)
  - locked <0xa29241f8> (a 
org.apache.kafka.common.utils.KafkaThread)
  at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372)
  at 
java.lang.ApplicationShutdownHooks.runHooks(java.base@17.0.5/ApplicationShutdownHooks.java:107)
  at 
java.lang.ApplicationShutdownHooks$1.run(java.base@17.0.5/ApplicationShutdownHooks.java:46)
  at java.lang.Shutdown.runHooks(java.base@17.0.5/Shutdown.java:130)
  at java.lang.Shutdown.exit(java.base@17.0.5/Shutdown.java:173)
  - locked <0xffe020b8> (a java.lang.Class for 
java.lang.Shutdown)
  at java.lang.Runtime.exit(java.base@17.0.5/Runtime.java:115)
  at java.lang.System.exit(java.base@17.0.5/System.java:1860)
  at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43)
  at org.apache.kafka.common.utils.Exit.exit(Exit.java:66)
  at org.apache.kafka.common.utils.Exit.exit(Exit.java:62)
  at 
org.apache.kafka.server.fault.ProcessExitingFaultHandler.handleFault(ProcessExitingFaultHandler.java:54)
  at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:891)
  at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:874)
  at 
org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:969){code}
and
{code:java}
  "kafka-shutdown-hook" #35 prio=5 os_prio=0 cpu=43.42ms elapsed=378593.04s 
allocated=4732K defined_classes=74 tid=0x7f5a7c09d800 nid=0x4f37 in 
Object.wait()  [0x7f5a47afd000]
 java.lang.Thread.State: WAITING (on object monitor)
  at java.lang.Object.wait(java.base@17.0.5/Native Method)
  - waiting on 
  at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304)
  - locked <0xa272bcb0> (a 
org.apache.kafka.common.utils.KafkaThread)
  at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372)
  at 
org.apache.kafka.queue.KafkaEventQueue.close(KafkaEventQueue.java:509)
  at 
org.apache.kafka.controller.QuorumController.close(QuorumController.java:2553)
  at kafka.server.ControllerServer.shutdown(ControllerServer.scala:521)
  at kafka.server.KafkaRaftServer.shutdown(KafkaRaftServer.scala:184)
  at kafka.Kafka$.$anonfun$main$3(Kafka.scala:99)
  at kafka.Kafka$$$Lambda$406/0x000800fb9730.apply$mcV$sp(Unknown 
Source)
  at kafka.utils.Exit$.$anonfun$addShutdownHook$1(Exit.scala:38)
  at kafka.Kafka$$$Lambda$407/0x000800fb9a10.run(Unknown Source)
  at java.lang.Thread.run(java.base@17.0.5/Thread.java:833)
  at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) 
{code}
h1. Possible Solution

A possible solution is to have the controller's unhandled fault handler call 
{{Runtime.halt}} instead of {{{}Runtime.exit{}}}.

  was:
h1. Problem

When the kraft controller encounters an error that it cannot handle it calls 
`ProcessExitingFaultHandler` which calls `Exit.exit` which calls `Runtime.exit`.

Based on the Runtime.exit documentation:

> All registered [shutdown 

[jira] [Created] (KAFKA-14693) KRaft Controller and ProcessExitingFaultHandler can deadlock shutdown

2023-02-08 Thread Jira
José Armando García Sancio created KAFKA-14693:
--

 Summary: KRaft Controller and ProcessExitingFaultHandler can 
deadlock shutdown
 Key: KAFKA-14693
 URL: https://issues.apache.org/jira/browse/KAFKA-14693
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 3.4.0
Reporter: José Armando García Sancio
 Fix For: 3.4.1


h1. Problem

When the kraft controller encounters an error that it cannot handle it calls 
`ProcessExitingFaultHandler` which calls `Exit.exit` which calls `Runtime.exit`.

Based on the Runtime.exit documentation:

> All registered [shutdown 
>hooks|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang.Thread-],
> if any, are started in some unspecified order and allowed to run concurrently 
>until they finish. Once this is done the virtual machine 
>[halts|https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#halt-int-].

One of the shutdown hooks registered by Kafka is `Server.shutdown()`. This 
shutdown hook eventually calls `KafkaEventQueue.close`. This last close method 
joins on the controller thread. Unfortunately, the controller thread also 
joined waiting for the shutdown hook thread to finish.

Here are an sample thread stacks:
{code:java}
   "QuorumControllerEventHandler" #45 prio=5 os_prio=0 cpu=429352.87ms 
elapsed=620807.49s allocated=38544M defined_classes=353 tid=0x7f5aeb31f800 
nid=0x80c in Object.wait()  [0x7f5a658fb000]
 java.lang.Thread.State: WAITING (on object monitor)


at java.lang.Object.wait(java.base@17.0.5/Native Method)
  - waiting on 
  at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304)
  - locked <0xa29241f8> (a 
org.apache.kafka.common.utils.KafkaThread)
  at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372)
  at 
java.lang.ApplicationShutdownHooks.runHooks(java.base@17.0.5/ApplicationShutdownHooks.java:107)
  at 
java.lang.ApplicationShutdownHooks$1.run(java.base@17.0.5/ApplicationShutdownHooks.java:46)
  at java.lang.Shutdown.runHooks(java.base@17.0.5/Shutdown.java:130)
  at java.lang.Shutdown.exit(java.base@17.0.5/Shutdown.java:173)
  - locked <0xffe020b8> (a java.lang.Class for 
java.lang.Shutdown)
  at java.lang.Runtime.exit(java.base@17.0.5/Runtime.java:115)
  at java.lang.System.exit(java.base@17.0.5/System.java:1860)
  at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43)
  at org.apache.kafka.common.utils.Exit.exit(Exit.java:66)
  at org.apache.kafka.common.utils.Exit.exit(Exit.java:62)
  at 
org.apache.kafka.server.fault.ProcessExitingFaultHandler.handleFault(ProcessExitingFaultHandler.java:54)
  at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:891)
  at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1.apply(QuorumController.java:874)
  at 
org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:969){code}
and
{code:java}
  "kafka-shutdown-hook" #35 prio=5 os_prio=0 cpu=43.42ms elapsed=378593.04s 
allocated=4732K defined_classes=74 tid=0x7f5a7c09d800 nid=0x4f37 in 
Object.wait()  [0x7f5a47afd000]
 java.lang.Thread.State: WAITING (on object monitor)
  at java.lang.Object.wait(java.base@17.0.5/Native Method)
  - waiting on 
  at java.lang.Thread.join(java.base@17.0.5/Thread.java:1304)
  - locked <0xa272bcb0> (a 
org.apache.kafka.common.utils.KafkaThread)
  at java.lang.Thread.join(java.base@17.0.5/Thread.java:1372)
  at 
org.apache.kafka.queue.KafkaEventQueue.close(KafkaEventQueue.java:509)
  at 
org.apache.kafka.controller.QuorumController.close(QuorumController.java:2553)
  at kafka.server.ControllerServer.shutdown(ControllerServer.scala:521)
  at kafka.server.KafkaRaftServer.shutdown(KafkaRaftServer.scala:184)
  at kafka.Kafka$.$anonfun$main$3(Kafka.scala:99)
  at kafka.Kafka$$$Lambda$406/0x000800fb9730.apply$mcV$sp(Unknown 
Source)
  at kafka.utils.Exit$.$anonfun$addShutdownHook$1(Exit.scala:38)
  at kafka.Kafka$$$Lambda$407/0x000800fb9a10.run(Unknown Source)
  at java.lang.Thread.run(java.base@17.0.5/Thread.java:833)
  at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) 
{code}
h1. Possible Solution

A possible solution is to have the controller's unhandle fault handler call 
`Runtime.halt` instead of `Runtime.exit`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1100539790


##
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A key-value store that stores multiple record versions per key, and 
supports timestamp-based
+ * retrieval operations to return the latest record (per key) as of a 
specified timestamp.
+ * Only one record is stored per key and timestamp, i.e., a second call to
+ * {@link #put(Object, Object, long)} with the same key and timestamp will 
replace the first.
+ * 
+ * Each store instance has an associated, fixed-duration "history retention" 
which specifies
+ * how long old record versions should be kept for. In particular, a versioned 
store guarantees
+ * to return accurate results for calls to {@link #get(Object, long)} where 
the provided timestamp
+ * bound is within history retention of the current observed stream time. 
(Queries with timestamp
+ * bound older than the specified history retention are considered invalid.)
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public interface VersionedKeyValueStore extends StateStore {
+
+/**
+ * Add a new record version associated with this key.
+ *
+ * @param key   The key
+ * @param value The value, it can be {@code null};
+ *  if the serialized bytes are also {@code null} it is 
interpreted as a delete
+ * @param timestamp The timestamp for this record version
+ * @throws NullPointerException If {@code null} is used for key.
+ */
+void put(K key, V value, long timestamp);
+
+/**
+ * Delete the value associated with this key from the store, at the 
specified timestamp
+ * (if there is such a value), and return the deleted value.
+ * 
+ * This operation is semantically equivalent to {@link #get(Object, long)} 
#get(key, timestamp))}
+ * followed by {@link #put(Object, Object, long) #put(key, null, 
timestamp)}.
+ *
+ * @param key   The key
+ * @param timestamp The timestamp for this delete
+ * @return The value and timestamp of the latest record associated with 
this key
+ * as of the deletion timestamp (inclusive), or {@code null} if 
any of

Review Comment:
   Totally -- JavaDocs are not public API. it's well, _docs_; if we have them 
in the KIP, it's just a template/placeholder. Not need to go back to the KIP 
(otherwise we could never change JavaDocs without altering older KIPs )



##
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A key-value store that stores multiple record versions per key, and 
supports timestamp-based
+ * retrieval operations to return the latest record (per key) as of a 
specified timestamp.
+ * Only one record is stored per key and timestamp, i.e., a second call to
+ * {@link #put(Object, Object, long)} with the same key and timestamp will 
replace the 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

2023-02-08 Thread via GitHub


Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1100547585


##
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link 
InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link 
ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Note that this class is only used by the `ConsoleProducer` and has one 
implementation. If no further used is intended, it can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686048#comment-17686048
 ] 

ASF GitHub Bot commented on KAFKA-14692:


mumrah merged PR #487:
URL: https://github.com/apache/kafka-site/pull/487




> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: David Arthur
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686045#comment-17686045
 ] 

ASF GitHub Bot commented on KAFKA-14692:


mumrah commented on PR #487:
URL: https://github.com/apache/kafka-site/pull/487#issuecomment-1423068675

   > Can you also open the same PR against kafka:3.4?
   
   Yes, I'll work on that later this week.




> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: David Arthur
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-02-08 Thread via GitHub


mjsax commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1098117328


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * 
+ * This store implementation consists of a "latest value store" and "segment 
stores." The latest
+ * record version for each key is stored in the latest value store, while 
older record versions
+ * are stored in the segment stores. Conceptually, each record version has two 
associated
+ * timestamps:
+ * 
+ * a {@code validFrom} timestamp. This timestamp is explicitly 
associated with the record
+ * as part of the {@link VersionedKeyValueStore#put(Object, Object, 
long)}} call to the store;
+ * i.e., this is the record's timestamp.
+ * a {@code validTo} timestamp. This is the timestamp of the next 
record (or deletion)
+ * associated with the same key, and is implicitly associated with the 
record. This timestamp
+ * can change as new records are inserted into the store.
+ * 
+ * The validity interval of a record is from validFrom (inclusive) to validTo 
(exclusive), and
+ * can change as new record versions are inserted into the store (and validTo 
changes as a result).
+ * 
+ * Old record versions are stored in segment stores according to their validTo 
timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can 
be dropped from the
+ * store at a time, once the records contained in the segment are no longer 
relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same 
key) within a single
+ * segment are stored together using the format specified in {@link 
RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBVersionedStore.class);
+// a marker to indicate that no record version has yet been found as part 
of an ongoing
+// put() procedure. any value which is not a valid record timestamp will 
do.
+private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+private final String name;
+private final long historyRetention;
+private final RocksDBMetricsRecorder metricsRecorder;
+
+private final RocksDBStore latestValueStore;
+private final LogicalKeyValueSegments segmentStores;
+private final LatestValueSchema latestValueSchema;
+private final SegmentValueSchema segmentValueSchema;
+private final 

[GitHub] [kafka] ahuang98 opened a new pull request, #13221: MINOR: Fix hyperlink tags in upgrade docs

2023-02-08 Thread via GitHub


ahuang98 opened a new pull request, #13221:
URL: https://github.com/apache/kafka/pull/13221

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686042#comment-17686042
 ] 

ASF GitHub Bot commented on KAFKA-14692:


mumrah commented on code in PR #487:
URL: https://github.com/apache/kafka-site/pull/487#discussion_r1100528500


##
34/ops.html:
##
@@ -3615,8 +3620,21 @@ Preparing for migration
   Provisioning the KRaft controller quorum
   
 Two things are needed before the migration can begin. The brokers must be 
configured to support the migration and
-a KRaft controller quorum must be deployed. For the KRaft deployment, 
please refer to the above documentation.
-The KRaft controllers should be provisioned with the latest 
metadata.version of "3.4".
+a KRaft controller quorum must be deployed. The KRaft controllers should 
be provisioned with the same cluster ID as
+the existing Kafka cluster. This can be found by examining one of the 
"meta.properties" files in the data directories
+of the brokers, or by running the following command.
+  
+
+  
+./bin/zookeeper-shell.sh localhost:2181 get /cluster/id

Review Comment:
   Good call. I'll adjust the other pre-formatted sections as well.





> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: David Arthur
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686041#comment-17686041
 ] 

ASF GitHub Bot commented on KAFKA-14692:


mimaison commented on code in PR #487:
URL: https://github.com/apache/kafka-site/pull/487#discussion_r1100522020


##
34/ops.html:
##
@@ -3615,8 +3620,21 @@ Preparing for migration
   Provisioning the KRaft controller quorum
   
 Two things are needed before the migration can begin. The brokers must be 
configured to support the migration and
-a KRaft controller quorum must be deployed. For the KRaft deployment, 
please refer to the above documentation.
-The KRaft controllers should be provisioned with the latest 
metadata.version of "3.4".
+a KRaft controller quorum must be deployed. The KRaft controllers should 
be provisioned with the same cluster ID as
+the existing Kafka cluster. This can be found by examining one of the 
"meta.properties" files in the data directories
+of the brokers, or by running the following command.
+  
+
+  
+./bin/zookeeper-shell.sh localhost:2181 get /cluster/id

Review Comment:
   Text within `` blocks keep the padding and the final line ending. I 
find the docs much more readable when we remove the spaces and avoid having a 
new line at the end.
   ```
 
   ./bin/zookeeper-shell.sh localhost:2181 get /cluster/id
   ```
   
   This applies to all other formatted blocks in this section.





> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: David Arthur
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686029#comment-17686029
 ] 

Mickael Maison commented on KAFKA-14692:


In my test cluster the errors persist.

After the controller has printed "Completed migration of metadata from 
Zookeeper to KRaft", it started printing "TRACE Sending RPCs to brokers for 
metadata delta." every 500ms.

Then I updated the config from one of the brokers:
 - changed broker.id to node.id
 - added process.roles=broker
 - removed all zookeeper configs
Restarted that broker and it spams:

{code:java}
[2023-02-08 19:06:20,410] ERROR Exception while processing request from 
192.168.1.11:9092-192.168.1.11:65127-216 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key LEADER_AND_ISR which is not enabled
{code}
On the controller side, there are few disconnection messages as broker 0 is 
restarted
{code:java}
[2023-02-08 19:05:00,454] INFO [Controller id=1000, targetBrokerId=0] Node 0 
disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-02-08 19:05:00,454] WARN [Controller id=1000, targetBrokerId=0] 
Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
[2023-02-08 19:05:00,473] INFO [Controller id=1000, targetBrokerId=0] Client 
requested connection close from node 0 (org.apache.kafka.clients.NetworkClient)
{code}
and then it starts spamming
{code:java}
[2023-02-08 19:05:09,204] INFO [Controller id=1000, targetBrokerId=0] Cancelled 
in-flight LEADER_AND_ISR request with correlation id 10 due to node 0 being 
disconnected (elapsed time since creation: 1535ms, elapsed time since send: 
1535ms, request timeout: 3ms) (org.apache.kafka.clients.NetworkClient)
[2023-02-08 19:05:09,241] INFO [Controller id=1000, targetBrokerId=0] Client 
requested connection close from node 0 (org.apache.kafka.clients.NetworkClient)
[2023-02-08 19:05:09,381] INFO [Controller id=1000, targetBrokerId=0] Node 0 
disconnected. (org.apache.kafka.clients.NetworkClient) 
{code}
 

 

> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: David Arthur
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9803) Allow producers to recover gracefully from transaction timeouts

2023-02-08 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686015#comment-17686015
 ] 

Travis Bischel commented on KAFKA-9803:
---

Would it be possible to move this forward? It looks like timed out transactions 
are still unrecoverable as of 3.4, due to the TRANSACTION_TIMED_OUT error 
return from EndTransactionRequest not yet being implemented.

> Allow producers to recover gracefully from transaction timeouts
> ---
>
> Key: KAFKA-9803
> URL: https://issues.apache.org/jira/browse/KAFKA-9803
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer , streams
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> Transaction timeouts are detected by the transaction coordinator. When the 
> coordinator detects a timeout, it bumps the producer epoch and aborts the 
> transaction. The epoch bump is necessary in order to prevent the current 
> producer from being able to begin writing to a new transaction which was not 
> started through the coordinator.  
> Transactions may also be aborted if a new producer with the same 
> `transactional.id` starts up. Similarly this results in an epoch bump. 
> Currently the coordinator does not distinguish these two cases. Both will end 
> up as a `ProducerFencedException`, which means the producer needs to shut 
> itself down. 
> We can improve this with the new APIs from KIP-360. When the coordinator 
> times out a transaction, it can remember that fact and allow the existing 
> producer to claim the bumped epoch and continue. Roughly the logic would work 
> like this:
> 1. When a transaction times out, set lastProducerEpoch to the current epoch 
> and do the normal bump.
> 2. Any transactional requests from the old epoch result in a new 
> TRANSACTION_TIMED_OUT error code, which is propagated to the application.
> 3. The producer recovers by sending InitProducerId with the current epoch. 
> The coordinator returns the bumped epoch.
> One issue that needs to be addressed is how to handle INVALID_PRODUCER_EPOCH 
> from Produce requests. Partition leaders will not generally know if a bumped 
> epoch was the result of a timed out transaction or a fenced producer. 
> Possibly the producer can treat these errors as abortable when they come from 
> Produce responses. In that case, the user would try to abort the transaction 
> and then we can see if it was due to a timeout or otherwise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


Hangleton commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100057051


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+private LeaderEndPoint leader;
+private ReplicaManager replicaMgr;
+private Integer fetchBackOffMs;
+
+public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+  ReplicaManager replicaMgr) {
+this.leader = leader;
+this.replicaMgr = replicaMgr;
+}
+
+
+/**
+ * Start the tier state machine for the provided topic partition. 
Currently, this start method will build the
+ * entire remote aux log state synchronously.
+ *
+ * @param topicPartition the topic partition
+ * @param currentFetchState the current PartitionFetchState which will
+ *  be used to derive the return value
+ * @param fetchPartitionData the data from the fetch response that 
returned the offset moved to tiered storage error
+ *
+ * @return the new PartitionFetchState after the successful start of the
+ * tier state machine
+ */
+public PartitionFetchState start(TopicPartition topicPartition,
+ PartitionFetchState currentFetchState,
+ PartitionData fetchPartitionData) throws 
Exception {
+
+Tuple2 epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = (int) epochAndLeaderLocalStartOffset._1;
+long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2;
+
+long offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset);
+
+Tuple2 fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long 

[GitHub] [kafka] dajac commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-08 Thread via GitHub


dajac commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1100495875


##
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##
@@ -212,24 +248,32 @@ public static ApiVersionCollection 
collectApis(Set apiKeys) {
  * @param listenerType the listener type which constrains the set of 
exposed APIs
  * @param minRecordVersion min inter broker magic
  * @param activeControllerApiVersions controller ApiVersions
+ * @param enableUnstableLastVersion whether unstable versions should be 
advertised or not

Review Comment:
   that's right but only the last version of an api could be unstable. this is 
where the "last" comes from.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-14692:


Assignee: David Arthur

> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: David Arthur
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686012#comment-17686012
 ] 

David Arthur commented on KAFKA-14692:
--

Thanks [~mimaison]! Here's a PR with the fixes 
https://github.com/apache/kafka-site/pull/487

For 4, you should remove the migration flag and the ZK configs.

The "Received request api key LEADER_AND_ISR which is not enabled" error means 
the controller thinks this broker is still in ZK mode. This could happen if 
there was an in-flight LISR for the previous broker incarnation before the 
controller got the new registration. Does that error persist, or only happen 
during startup?

> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: David Arthur
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #13203: KAFKA-14048: Add new `__consumer_offsets` records from KIP-848

2023-02-08 Thread via GitHub


mimaison commented on code in PR #13203:
URL: https://github.com/apache/kafka/pull/13203#discussion_r1100494370


##
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentKey.json:
##
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// KIP-848 is in development. This schema is subject to 
non-backwards-compatible changes.
+{
+  "type": "data",
+  "name": "ConsumerGroupCurrentMemberAssignmentKey",
+  "validVersions": "8",

Review Comment:
   I see, thanks for the explaination



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100491548


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.image.writer.ImageWriter;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.clients.admin.ScramMechanism;
+
+// import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+// import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents the SCRAM credentials in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ScramImage {
+public static final ScramImage EMPTY = new 
ScramImage(Collections.emptyMap());
+
+private final Map> 
mechanisms;
+
+public ScramImage(Map> 
mechanisms) {
+this.mechanisms = Collections.unmodifiableMap(mechanisms);
+}
+
+public void write(ImageWriter writer, ImageWriterOptions options) {

Review Comment:
   This is the same as in `AclsImage.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-08 Thread via GitHub


jeffkbkim commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1100490932


##
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##
@@ -212,24 +248,32 @@ public static ApiVersionCollection 
collectApis(Set apiKeys) {
  * @param listenerType the listener type which constrains the set of 
exposed APIs
  * @param minRecordVersion min inter broker magic
  * @param activeControllerApiVersions controller ApiVersions
+ * @param enableUnstableLastVersion whether unstable versions should be 
advertised or not

Review Comment:
   i'm a bit confused on the word "last". does this mean we enable only if the 
last version is unstable? my understanding is we enable all unstable versions 
or we don't



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-02-08 Thread via GitHub


mimaison commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100386611


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java:
##
@@ -80,18 +81,25 @@ public class MirrorMakerConfig extends AbstractConfig {
 static final String TARGET_CLUSTER_PREFIX = "target.cluster.";
 static final String SOURCE_PREFIX = "source.";
 static final String TARGET_PREFIX = "target.";
+static final String ENABLE_INTERNAL_REST_CONFIG = 
"dedicated.mode.enable.internal.rest";
+private static final String ENABLE_INTERNAL_REST_DOC =
+"Whether to bring up an internal-only REST server that allows 
multi-node clusters to operate correctly";

Review Comment:
   Nit: We usually have a period at the end of descriptions.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+@Context
+private UriInfo uriInfo;
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalMirrorResource.class);
+
+private final Map herders;
+
+public InternalMirrorResource(Map herders, 
RestClient restClient) {
+super(restClient);
+this.herders = herders;
+}
+
+@Override
+protected Herder herderForRequest() {
+String source = pathParam("source");
+String target = pathParam("target");
+Herder result = herders.get(new SourceAndTarget(source, target));
+if (result == null) {
+log.debug("Failed to find herder for source '{}' and target '{}'", 
source, target);
+throw new NotFoundException("No replication flow found for source 
'" + source + "' and target '" + target + "'");
+}
+return result;
+}
+
+private String pathParam(String name) {
+List result = uriInfo.getPathParameters().get(name);
+if (result == null || result.isEmpty())
+throw new NotFoundException();

Review Comment:
   Should we add a message here?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import 

[GitHub] [kafka] dajac commented on pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-08 Thread via GitHub


dajac commented on PR #12972:
URL: https://github.com/apache/kafka/pull/12972#issuecomment-1422991741

   Thanks @hachikuji!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100467620


##
metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND;
+import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL;
+import static 
org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM;
+
+
+/**
+ * Manages SCRAM credentials.
+ */
+public class ScramControlManager {
+static final int MAX_ITERATIONS = 16384;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+ScramControlManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+return new ScramControlManager(logContext,
+snapshotRegistry);
+}
+}
+
+static class ScramCredentialKey {
+private final String username;
+private final ScramMechanism mechanism;
+
+ScramCredentialKey(String username, ScramMechanism mechanism) {
+this.username = username;
+this.mechanism = mechanism;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(username, mechanism);
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null) return false;
+if (!(o.getClass() == this.getClass())) return false;
+ScramCredentialKey other = (ScramCredentialKey) o;
+return username.equals(other.username) &&
+mechanism.equals(other.mechanism);
+}
+
+@Override
+public String toString() {
+return "ScramCredentialKey" +
+"(username=" + username +
+", mechanism=" + mechanism +
+")";
+}
+}
+
+static class ScramCredentialValue {
+private final byte[] salt;
+private final byte[] saltedPassword;
+private final int iterations;
+
+ScramCredentialValue(
+byte[] salt,
+byte[] saltedPassword,
+int iterations
+) {
+this.salt = salt;
+this.saltedPassword = saltedPassword;
+this.iterations = iterations;
+}
+
+@Override
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-08 Thread via GitHub


hachikuji commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1100465745


##
generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java:
##
@@ -411,7 +409,7 @@ private void generateListenerTypesEnum() {
 }
 
 private void generateHighestStableVersion() {

Review Comment:
   nit: `generateHighestSupportedVersion`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-08 Thread via GitHub


dajac commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1100463203


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.

Review Comment:
   Not this one. We will introduce another one for the client. They are close 
but different and we want to evolve them separately in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-08 Thread via GitHub


dajac commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1100462303


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The assignment specification for a consumer group member.
+ */
+public class AssignmentMemberSpec {

Review Comment:
   The server side assignor does not have those: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-BrokerAPI.
 Note that the KIP introduces two new `PartitionAssignor` interfaces. One on 
the server side and one one the client side. They are slightly different.
   
   Regarding the `reason`, the `reason` is basically an error code defined by a 
custom assignor (e.g. streams). The assignor will use this to pass information 
between a member to the assignor. You can see the ones that Streams will use 
[here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberMetadataReasons).
 Using a string is not really appropriate for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1100459265


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##
@@ -98,6 +104,24 @@ private boolean flushing() {
 return toFlush != null;
 }
 
+public boolean waitForBeginFlush(Supplier timeout, TimeUnit 
timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   It seems like we're bending over backwards here to accommodate an assumption 
made in `beginFlush` that we'll never try to trigger two offset flushes at 
once, which is clearly false given the conditions that necessitate this fix 
(i.e., a task's end-of-life offset flush is triggered at the same time as its 
periodic offset flush).
   
   Given that, do we really need a separate method here, or can we relax the 
constraints in `beginFlush` to wait for in-progress flushes to conclude instead 
of throwing an exception if there are any?
   
   Additionally, it seems like the use of a `CompleteableFuture` here is a bit 
strange. Would a `Semaphore` or `CountDownLatch` be more suited?
   
   Finally--since this change may lead to us performing double offset commits 
when a task is being shut down, do you think it might also make sense to add a 
`close` method to the offset writer that throws an exception for any further 
attempts to flush, and possibly forcibly terminates any in-progress flushes? We 
can invoke that in `AbstractWorkerTask::cancel` (or possibly 
`WorkerSourceTask::cancel` if a different approach is necessary to preserve 
exactly-once semantics) to help tasks complete shutdown within the timeout 
allotted to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-08 Thread via GitHub


dajac commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1100458653


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The assignment specification for a consumer group member.

Review Comment:
   That's correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100451315


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -816,6 +817,17 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleAlterUserScramCredentials(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[AlterUserScramCredentialsRequest]
+val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,

Review Comment:
   Lets chat about this. Currently talking directly with the controller is not 
allowed for these operations. Only brokers are allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-08 Thread via GitHub


jeffkbkim commented on code in PR #13202:
URL: https://github.com/apache/kafka/pull/13202#discussion_r1100419447


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The assignment specification for a consumer group member.
+ */
+public class AssignmentMemberSpec {

Review Comment:
   from kip 848, it looks like we're missing
   ```
   /**
* The reason reported by the member.
*/
   byte reason; 
   ...
* The metadata reported by the member.
*/
   Metadata metadata;
   ```
   what's the reason we omitted these? also, why can't we "reason" store as a 
string? asking because this made things harder to debug from the server side 



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The assignment specification for a consumer group member.

Review Comment:
   to confirm, this partition assignor is only used by the new consumer group 
hence "consumer group"; is this correct?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side partition assignor used by the GroupCoordinator.

Review Comment:
   isn't this used by the new client side assignor as well? 
   > The new PartitionAssignor interface will be introduced to replace the 
ConsumerPartitionAssignor interface
   
   also, the kip mentions that we'll be adding uniform & range for server side 
assignor. what about client side assignors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


dajac commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100441588


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -2455,7 +2454,7 @@ private void close(Duration timeout, boolean 
swallowException) {
 closeTimer.reset(remainingDurationInTimeout);
 
 // This is a blocking call bound by the time remaining in 
closeTimer
-LambdaUtils.swallow(() -> fetcher.close(closeTimer), 
firstException);
+Utils.swallow(log, " fetcher close", () -> 
fetcher.close(closeTimer), firstException);

Review Comment:
   nit: There is an extra space before `fetcher`.



##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -238,7 +238,7 @@ public KafkaClusterTestKit build() throws Exception {
 bootstrapMetadata);
 } catch (Throwable e) {
 log.error("Error creating controller {}", node.id(), 
e);
-Utils.swallow(log, "sharedServer.stopForController", 
() -> sharedServer.stopForController());
+Utils.swallow(log, "sharedServer.stopForController", 
() -> sharedServer.stopForController(), null);

Review Comment:
   nit: Should we add an overload to avoid having to pass null here?



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -998,15 +998,18 @@ public static void closeAll(Closeable... closeables) 
throws IOException {
 throw exception;
 }
 
-public static void swallow(
-Logger log,
-String what,
-Runnable runnable
-) {
-try {
-runnable.run();
-} catch (Throwable e) {
-log.warn("{} error", what, e);
+/**
+ * Run the supplied code. If an exception is thrown, it is swallowed and 
registered to the firstException parameter.
+ */
+public static void swallow(Logger log, String what, final Runnable code, 
final AtomicReference firstException) {
+if (code != null) {
+try {
+code.run();
+} catch (Throwable t) {
+log.warn("{} error", what, t);

Review Comment:
   Should this be an error instead of a warn to be consistent with 
`closeQuietly`? Moreover, I wonder if we could improve the error message. We 
would get something like `fetcher close error` which is not really inline with 
what we usually log. For instance, `closeQuietly` would log something like 
`Failed to close fetch...`. Do you have any thoughts on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-02-08 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685984#comment-17685984
 ] 

Chris Egerton commented on KAFKA-14682:
---

Thanks Christo!

> Unused stubbings are not reported by Mockito during CI builds
> -
>
> Key: KAFKA-14682
> URL: https://issues.apache.org/jira/browse/KAFKA-14682
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Chris Egerton
>Assignee: Christo Lolov
>Priority: Major
>
> We've started using [strict 
> stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
>  for unit tests written with Mockito, which is supposed to automatically fail 
> tests when they set up mock expectations that go unused.
> However, these failures are not reported during Jenkins builds, even if they 
> are reported when building/testing locally.
> In at least one case, this difference appears to be because our [Jenkins 
> build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
>  uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
> project's [Gradle build 
> file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
>  instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the 
> latter instead of the former, which can cause tests to fail due to 
> unnecessary stubbings when being run in that IDE but not when being built on 
> Jenkins.
> It's possible that, because the custom test tasks filter out some tests from 
> running, Mockito does not check for unnecessary stubbings in order to avoid 
> incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} 
> method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


C0urante commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1422939422

   Oh, and the Jenkins build seems to be consistently failing on the 
`IdentityReplicationIntegrationTest::testNoCheckpointsIfNoRecordsAreMirrored` 
test case. Probably want to look into that before we merge 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1099340592


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
 int cnt = 0;
 for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
 for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster, topicName, p, "key", "value-" + cnt++);
 }
-
+
+protected void produce(EmbeddedConnectCluster cluster, String topic, 
Integer partition, String key, String value) {
+cluster.kafka().produce(topic, partition, key, value);
+}
+
+protected static Map 
waitForAnyCheckpoint(

Review Comment:
   Nit: why "anyCheckpoint" instead of "allCheckpoints", given that we're 
ensuring that there's a checkpoint for every partition of the topic, instead of 
only one?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition 
sourceTopicPartition, long upstr
 // Offset is too far in the past to translate accurately
 return OptionalLong.of(-1L);
 }
-long upstreamStep = upstreamOffset - 
offsetSync.get().upstreamOffset();
-return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
+return OptionalLong.of(offsetSync.get().downstreamOffset() + 
(offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   I believe this part is correct, but it took me a while to grok the 
motivation here. Could we pull this out into two lines, one to determine the 
amount we bump by (i.e., 0 or 1) with a comment providing a rationale for the 
logic (including why we can't do a more aggressive bump derived from the delta 
between the upstream offsets for the consumer group and the checkpoint), and 
one to return the downstream offset plus that bump?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
 // have been automatically synchronized from primary to backup by the 
background job, so no
 // more records to consume from the replicated topic by the same 
consumer group at backup cluster
 assertEquals(0, records.count(), "consumer record size is not zero");
+backupConsumer.close();

Review Comment:
   Good catch! Can we use a try-with-resources block for these consumers? 
Should set a precedent that makes it harder to leak them in the future.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -725,11 +730,20 @@ protected static  void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster
 long consumerGroupOffsetTotal = 
consumerGroupOffsets.values().stream()
 .mapToLong(OffsetAndMetadata::offset).sum();
 
-Map offsets = consumer.endOffsets(tps, 
CONSUMER_POLL_TIMEOUT_MS);
-long totalOffsets = offsets.values().stream().mapToLong(l -> 
l).sum();
-
+Map 
offsets =

Review Comment:
   Nit: "offsets" and "totalOffsets" are a little generic and confusing; WDYT 
about "endOffsets" and "totalEndOffsets"?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() 
throws InterruptedExceptio
 Map translatedOffsets = 
backupClient.remoteConsumerOffsets(
 consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
 return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
-   translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));

Review Comment:
   If I'm reading this correctly, the reason we don't expect `tp2` to have 
translated offsets here is because the upstream consumer group offset for this 
partition is behind the checkpoints emitted for it.
   
   If that's correct:
   1. Any idea why this was passing before?
   2. Do you think we should expand the test to commit an offset for this 
partition in the consumer group, then verify that that offset is included in 
the translated offsets afterward?



##

[GitHub] [kafka] omkreddy commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


omkreddy commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1100299605


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -816,6 +817,17 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleAlterUserScramCredentials(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[AlterUserScramCredentialsRequest]
+val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,

Review Comment:
   We need to authorisation `authHelper.authorizeClusterOperation(request, 
ALTER)` check here.
   
   Similar check in KafkaAPI:
   
[handleAlterUserScramCredentialsRequest](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3326)



##
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala:
##
@@ -85,15 +113,17 @@ class AlterUserScramCredentialsRequestTest extends 
BaseRequestTest {
   .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(),
 )
 requests.foreach(request => {
-  val response = sendAlterUserScramCredentialsRequest(request)
+  val response = sendAlterUserScramCredentialsRequest(request, 
adminSocketServer)
   val results = response.data.results
   assertEquals(2, results.size)
   checkAllErrorsAlteringCredentials(results, Errors.DUPLICATE_RESOURCE, 
"when altering the same credential twice in a single request")
 })
   }
 
-  @Test
-  def testAlterEmptyUser(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testAlterEmptyUser(quorum: String): Unit = {
+  println("Starting test")

Review Comment:
   unwanted line



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -221,6 +223,21 @@ class BrokerMetadataPublisher(
   s"quotas in ${deltaName}", t)
   }
 
+  // Apply changes to SCRAM credentials.
+  Option(delta.scramDelta()).foreach { scramDelta =>

Review Comment:
   May I know, how are applying changes on Controller Nodes: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ControllerServer.scala#L176



##
metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.utils.Bytes;
+
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+/**
+ * Represents the ACLs in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ScramCredentialData {
+private final byte[] salt;
+private final byte[] saltedPassword;
+private final int iterations;
+
+static ScramCredentialData fromRecord(
+UserScramCredentialRecord record
+) {
+return new ScramCredentialData(
+record.salt(),
+record.saltedPassword(),
+record.iterations());
+}
+
+public ScramCredentialData(
+byte[] salt,
+byte[] saltedPassword,
+int iterations
+) {
+this.salt = salt;
+this.saltedPassword = saltedPassword;
+this.iterations = iterations;
+}
+
+public byte[] salt() {
+return salt;
+}
+
+public byte[] saltedPassword() {
+return saltedPassword;
+}
+
+public int iterations() {
+return iterations;
+}
+
+public UserScramCredentialRecord toRecord(
+String userName,
+ScramMechanism mechanism
+) {
+return new UserScramCredentialRecord().
+setName(userName).
+setMechanism(mechanism.type()).
+setSalt(salt).
+

[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


divijvaidya commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100385270


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -447,6 +430,33 @@ private RequestFuture 
sendMetadataRequest(MetadataRequest.Builde
 return client.send(node, request);
 }
 
+/**
+ * Send Fetch Request to Kafka cluster asynchronously.
+ *
+ * This method is visible for testing.
+ *
+ * @return A future that indicates result of sent Fetch request
+ */
+private RequestFuture sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,
+ final Node 
fetchTarget) {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


divijvaidya commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100384922


##
clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Lambda helpers.
+ */
+@FunctionalInterface
+public interface LambdaUtils {
+/**
+ * Run some code, possibly throw some exceptions.
+ *
+ * @throws Exception
+ */
+void run() throws Exception;
+
+/**
+ * Provide an idempotent instance of the supplied code - ensure that the 
supplied code gets run only once, no
+ * matter how many times .run() is called.
+ */
+static Runnable idempotent(final Runnable code) {

Review Comment:
   No, it's not used anywhere, I added it as a helper for future. But now I 
have removed this file completely and moved only the essential functions to 
`Utils.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


divijvaidya commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100383535


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -2425,17 +2426,38 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<
 return clusterResourceListeners;
 }
 
-private void close(long timeoutMs, boolean swallowException) {
+private Timer createTimerForRequest(final Duration timeout) {
+// this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+final Time localTime = (time == null) ? Time.SYSTEM : time;
+return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+}
+
+private void close(Duration timeout, boolean swallowException) {
 log.trace("Closing the Kafka consumer");
 AtomicReference firstException = new AtomicReference<>();
-try {
-if (coordinator != null)
-coordinator.close(time.timer(Math.min(timeoutMs, 
requestTimeoutMs)));
-} catch (Throwable t) {
-firstException.compareAndSet(null, t);
-log.error("Failed to close coordinator", t);
+
+final Timer closeTimer = createTimerForRequest(timeout);
+// Close objects with a timeout. The timeout is required because the 
coordinator & the fetcher send requests to
+// the server in the process of closing which may not respect the 
overall timeout defined for closing the
+// consumer.
+if (coordinator != null) {
+// This is a blocking call bound by the time remaining in 
closeTimer
+LambdaUtils.swallow(() -> coordinator.close(closeTimer), 
firstException);

Review Comment:
   Moved to Utils in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13127: KAFKA-14586: Moving StreamResetter to tools

2023-02-08 Thread via GitHub


vamossagar12 commented on PR #13127:
URL: https://github.com/apache/kafka/pull/13127#issuecomment-1422825052

   > @vamossagar12 as you can see from the CI output, there are checkstyle 
errors. Have you tried to run at least one of the system tests that are using 
the tool?
   
   hi @fvaleri , yes I am aware of the checkstyle issue and pointed it out 
here: https://github.com/apache/kafka/pull/13127#issuecomment-1396439952. I 
have run the StreamResetter tests and also modified the system test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


dajac commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100295718


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -447,6 +430,33 @@ private RequestFuture 
sendMetadataRequest(MetadataRequest.Builde
 return client.send(node, request);
 }
 
+/**
+ * Send Fetch Request to Kafka cluster asynchronously.
+ *
+ * This method is visible for testing.
+ *
+ * @return A future that indicates result of sent Fetch request
+ */
+private RequestFuture sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,
+ final Node 
fetchTarget) {

Review Comment:
   nit: Indentation is off here.



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -2425,17 +2426,38 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<
 return clusterResourceListeners;
 }
 
-private void close(long timeoutMs, boolean swallowException) {
+private Timer createTimerForRequest(final Duration timeout) {
+// this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+final Time localTime = (time == null) ? Time.SYSTEM : time;
+return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+}
+
+private void close(Duration timeout, boolean swallowException) {
 log.trace("Closing the Kafka consumer");
 AtomicReference firstException = new AtomicReference<>();
-try {
-if (coordinator != null)
-coordinator.close(time.timer(Math.min(timeoutMs, 
requestTimeoutMs)));
-} catch (Throwable t) {
-firstException.compareAndSet(null, t);
-log.error("Failed to close coordinator", t);
+
+final Timer closeTimer = createTimerForRequest(timeout);
+// Close objects with a timeout. The timeout is required because the 
coordinator & the fetcher send requests to
+// the server in the process of closing which may not respect the 
overall timeout defined for closing the
+// consumer.
+if (coordinator != null) {
+// This is a blocking call bound by the time remaining in 
closeTimer
+LambdaUtils.swallow(() -> coordinator.close(closeTimer), 
firstException);

Review Comment:
   We have `closeQuietly` in `Utils` that is pretty close to this one. I think 
that `swallow` should be move there as well. We don't need another `*Utils` 
class.



##
clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Lambda helpers.
+ */
+@FunctionalInterface
+public interface LambdaUtils {
+/**
+ * Run some code, possibly throw some exceptions.
+ *
+ * @throws Exception
+ */
+void run() throws Exception;
+
+/**
+ * Provide an idempotent instance of the supplied code - ensure that the 
supplied code gets run only once, no
+ * matter how many times .run() is called.
+ */
+static Runnable idempotent(final Runnable code) {

Review Comment:
   Is this one used anywhere? I can't find it.



##
clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * 

[GitHub] [kafka] littlehorse-eng commented on pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.

2023-02-08 Thread via GitHub


littlehorse-eng commented on PR #13082:
URL: https://github.com/apache/kafka/pull/13082#issuecomment-1422802097

   @lucasbru I used the repo you sent; the home page loaded albeit without CSS 
and when I clicked on Streams documentation it yielded the same result as 
navigating to the file on my localhost. Must be doing something wrong...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ouyangnengda opened a new pull request, #13220: Kafka 14253

2023-02-08 Thread via GitHub


ouyangnengda opened a new pull request, #13220:
URL: https://github.com/apache/kafka/pull/13220

   Add the number of members in the log to improve readability.
   
   Example logs -
   
   `StreamsPartitionAssignor.java`
   
   ```xml
   Before - [2023-02-08 23:22:21,419] INFO stream-thread [] Assigned tasks 
[0_0, 0_1, 0_2, 0_3] including stateful [] to clients as: 
   ----0001=[activeTasks: ([0_0, 0_2, 0_3]) 
standbyTasks: ([])]
   ----0002=[activeTasks: ([0_1]) standbyTasks: 
([])]. 
   
   After - [2023-02-07 23:05:29,723] INFO stream-thread [] Assigned 4 tasks 
[0_0, 0_1, 0_2, 0_3] including 0 stateful [] to 2 clients as: 
   ----0001=[3 activeTasks: ([0_0, 0_2, 0_3]) 0 
standbyTasks: ([])]
   ----0002=[1 activeTasks: ([0_1]) 0 standbyTasks: 
([])]. 
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2023-02-08 Thread via GitHub


dajac commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100288626


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -1933,11 +1943,79 @@ private Map 
topicPartitionTags(TopicPartition tp) {
 }
 }
 
+// Visible for testing
+void maybeCloseFetchSessions(final Timer timer) {
+final Cluster cluster = metadata.fetch();
+final List> requestFutures = new 
ArrayList<>();
+for (final Map.Entry entry : 
sessionHandlers.entrySet()) {
+final FetchSessionHandler sessionHandler = entry.getValue();
+// set the session handler to notify close. This will set the next 
metadata request to send close message.
+sessionHandler.notifyClose();
+
+final int sessionId = sessionHandler.sessionId();
+final Integer fetchTargetNodeId = entry.getKey();
+// FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+// skip sending the close request.
+final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+continue;
+}
+
+final RequestFuture responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+responseFuture.addListener(new 
RequestFutureListener() {
+@Override
+public void onSuccess(ClientResponse value) {
+log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);
+}
+
+@Override
+public void onFailure(RuntimeException e) {
+log.debug("Unable to a close message for fetch session: {} 
to node: {}. " +
+"This may result in unnecessary fetch sessions at the 
broker.", sessionId, fetchTarget, e);
+}
+});
+
+requestFutures.add(responseFuture);
+}
+
+// Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+// all requests have received a response.
+do {
+client.poll(timer, null, true);
+} while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone));
+
+if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+// we ran out of time before completing all futures. It is ok 
since we don't want to block the shutdown
+// here.
+log.debug("All requests couldn't be sent in the specific timeout 
period {}ms. " +
+"This may result in unnecessary fetch sessions at the broker. 
Consider increasing the timeout passed for " +
+"KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
+}
+}
+
+public void close(final Timer timer) {
+if (!isClosed.compareAndSet(false, true)) {
+log.info("Fetcher {} is already closed.", this);
+return;
+}
+
+// Shared states (e.g. sessionHandlers) could be accessed by multiple 
threads (such as heartbeat thread), hence,
+// it is necessary to acquire a lock on the fetcher instance before 
modifying the states.

Review Comment:
   Thanks for the clarification. That makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685935#comment-17685935
 ] 

Mickael Maison commented on KAFKA-14692:


cc [~mumrah] 

 

> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14692:
---
Description: 
Following [https://kafka.apache.org/documentation/#kraft_zk_migration]

1) It completely skips the facts that the storage for the new quorum should be 
formatted using the existing cluster id.

2) In Provisioning the KRaft controller quorum:
{{controller.quorum.voters=1@localhost:9093}} should be 
{{controller.quorum.voters=3000@localhost:9093}} as node.id=3000

3) When migrating brokers, it states:
{code:java}
# Don't set the IBP, KRaft uses "metadata.version" feature flag
# inter.broker.protocol.version=3.4

# Keep the migration enabled
zookeeper.metadata.migration.enable=true

# Remove ZooKeeper client configuration
# zookeeper.connect=localhost:2181
{code}
However if I do that, my brokers fails to restart and print:

{code:java}
org.apache.kafka.common.config.ConfigException: If using 
zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must also 
be set.
{code}


4) When disabling zookeeper.metadata.migration.enable or keeping 
zookeeper.connect to get past this step, when brokers restart they print a lot 
of error messages:

{code:java}
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key LEADER_AND_ISR which is not enabled
[2023-02-08 12:06:25,776] ERROR Exception while processing request from 
192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
{code}


 

  was:
Following https://kafka.apache.org/documentation/#kraft_zk_migration

1) It completely skips the facts that the storage for the new quorum should be 
formatted using the existing cluster id.

2) In Provisioning the KRaft controller quorum:
controller.quorum.voters=1@localhost:9093 should be 
controller.quorum.voters=3000@localhost:9093 as node.id=3000

3) When migrating brokers, it states:
# Don't set the IBP, KRaft uses "metadata.version" feature flag
# inter.broker.protocol.version=3.4

# Keep the migration enabled
zookeeper.metadata.migration.enable=true

# Remove ZooKeeper client configuration
# zookeeper.connect=localhost:2181
However if I do that, my brokers fails to restart and print:
org.apache.kafka.common.config.ConfigException: If using 
zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must also 
be set.

4) When disabling zookeeper.metadata.migration.enable or keeping 
zookeeper.connect to get past this step, when brokers restart they print a lot 
of error messages:
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key LEADER_AND_ISR which is not enabled
[2023-02-08 12:06:25,776] ERROR Exception while processing request from 
192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) 


> Issues in Zookeeper to KRaft migration docs
> ---
>
> Key: KAFKA-14692
> URL: https://issues.apache.org/jira/browse/KAFKA-14692
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Priority: Major
>
> Following [https://kafka.apache.org/documentation/#kraft_zk_migration]
> 1) It completely skips the facts that the storage for the new quorum should 
> be formatted using the existing cluster id.
> 2) In Provisioning the KRaft controller quorum:
> {{controller.quorum.voters=1@localhost:9093}} should be 
> {{controller.quorum.voters=3000@localhost:9093}} as node.id=3000
> 3) When migrating brokers, it states:
> {code:java}
> # Don't set the IBP, KRaft uses "metadata.version" feature flag
> # inter.broker.protocol.version=3.4
> # Keep the migration enabled
> zookeeper.metadata.migration.enable=true
> # Remove ZooKeeper client configuration
> # zookeeper.connect=localhost:2181
> {code}
> However if I do that, my brokers fails to restart and print:
> {code:java}
> org.apache.kafka.common.config.ConfigException: If using 
> zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must 
> also be set.
> {code}
> 4) When disabling zookeeper.metadata.migration.enable or keeping 
> zookeeper.connect to get past this step, when brokers restart they print a 
> lot of error messages:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-08 12:06:25,776] ERROR Exception while processing request from 
> 192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14692) Issues in Zookeeper to KRaft migration docs

2023-02-08 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14692:
--

 Summary: Issues in Zookeeper to KRaft migration docs
 Key: KAFKA-14692
 URL: https://issues.apache.org/jira/browse/KAFKA-14692
 Project: Kafka
  Issue Type: Improvement
  Components: docs
Reporter: Mickael Maison


Following https://kafka.apache.org/documentation/#kraft_zk_migration

1) It completely skips the facts that the storage for the new quorum should be 
formatted using the existing cluster id.

2) In Provisioning the KRaft controller quorum:
controller.quorum.voters=1@localhost:9093 should be 
controller.quorum.voters=3000@localhost:9093 as node.id=3000

3) When migrating brokers, it states:
# Don't set the IBP, KRaft uses "metadata.version" feature flag
# inter.broker.protocol.version=3.4

# Keep the migration enabled
zookeeper.metadata.migration.enable=true

# Remove ZooKeeper client configuration
# zookeeper.connect=localhost:2181
However if I do that, my brokers fails to restart and print:
org.apache.kafka.common.config.ConfigException: If using 
zookeeper.metadata.migration.enable in KRaft mode, zookeeper.connect must also 
be set.

4) When disabling zookeeper.metadata.migration.enable or keeping 
zookeeper.connect to get past this step, when brokers restart they print a lot 
of error messages:
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key LEADER_AND_ISR which is not enabled
[2023-02-08 12:06:25,776] ERROR Exception while processing request from 
192.168.1.11:9092-192.168.1.11:57210-107 (kafka.network.Processor) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] pprovenzano commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-08 Thread via GitHub


pprovenzano commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1422710574

   
   I plan to update existing tests after bootstrap support is committed.
   I'll check on the `handleDescribeUserScramCredentialsRequest`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-02-08 Thread via GitHub


viktorsomogyi commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100211704


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 // Pass the shared admin to the distributed herder as an additional 
AutoCloseable object that should be closed when the
 // herder is stopped. MirrorMaker has multiple herders, and having the 
herder own the close responsibility is much easier than
 // tracking the various shared admin objects in this class.
-// Do not provide a restClient to the DistributedHerder to indicate 
that request forwarding is disabled
 Herder herder = new DistributedHerder(distributedConfig, time, worker,
 kafkaClusterId, statusBackingStore, configBackingStore,
-advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, 
sharedAdmin);
+advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+restNamespace, sharedAdmin);
 herders.put(sourceAndTarget, herder);
 }
 
+private static String encodePath(String rawPath) throws 
UnsupportedEncodingException {
+return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+// Java's out-of-the-box URL encoder encodes spaces (' ') as 
pluses ('+'),
+// and pluses as '%2B'
+// But Jetty doesn't decode pluses at all and leaves them 
as-are in decoded
+// URLs
+// So to get around that, we replace pluses in the encoded URL 
here with '%20',
+// which is the encoding that Jetty expects for spaces
+// Jetty will reverse this transformation when evaluating the 
path parameters
+// and will return decoded strings with all special characters 
as they were.

Review Comment:
   Ok, I'm fine with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >