[jira] [Created] (KAFKA-15893) Bump MetadataVersion for directory assignments

2023-11-24 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-15893:
---

 Summary: Bump MetadataVersion for directory assignments
 Key: KAFKA-15893
 URL: https://issues.apache.org/jira/browse/KAFKA-15893
 Project: Kafka
  Issue Type: Sub-task
Reporter: Igor Soarez






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]

2023-11-24 Thread via GitHub


s7pandey commented on code in PR #14605:
URL: https://github.com/apache/kafka/pull/14605#discussion_r1404742800


##
streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java:
##
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
+import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory;
+import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext;
+import 
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+public class MockFixedKeyProcessorContextTest {

Review Comment:
   Definitely! Do you think I should just add some specific FixedKeyProcessor 
tests to the MockProcessorContextAPITest file since it is mostly the same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add retry for CI [kafka]

2023-11-24 Thread via GitHub


ex172000 commented on PR #14828:
URL: https://github.com/apache/kafka/pull/14828#issuecomment-1826221078

   > Thanks for the PR. Note that The CI configuration already sets retries, we 
don't need a CI server configuration.
   
   Is the CI retry a global setting? It might make more sense if we just retry 
the failed ones from gradle?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404635960


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -36,14 +36,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
-  def testSimpleConsumption(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = {
 val numRecords = 1
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14811:
URL: https://github.com/apache/kafka/pull/14811#discussion_r1404634302


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3759,6 +3760,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 CompletableFuture.completedFuture[Unit](())
   }
 
+  // Just a placeholder for now.
+  def handleListClientMetricsResources(request: RequestChannel.Request): 
CompletableFuture[Unit] = {

Review Comment:
   Just giving the option of asynchronous completion. However, since I don't 
strictly need it now, I am simplifying it to `Unit`.



##
clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Collection;
+
+/**
+ * The result of the {@link Admin#listClientMetricsResources()} call.
+ * 
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListClientMetricsResourcesResult {
+private final KafkaFuture> future;
+
+
ListClientMetricsResourcesResult(KafkaFuture>
 future) {
+this.future = future;
+}
+
+/**
+ * Returns a future that yields either an exception, or the full set of 
client metrics
+ * listings.
+ *
+ * In the event of a failure, the future yields nothing but the first 
exception which
+ * occurred.
+ */
+public KafkaFuture> all() {
+final KafkaFutureImpl> result 
= new KafkaFutureImpl<>();
+this.future.whenComplete((listings, throwable) -> {

Review Comment:
   No. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14811:
URL: https://github.com/apache/kafka/pull/14811#discussion_r1404634074


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -7090,6 +7093,51 @@ private static MemberDescription 
convertToMemberDescriptions(DescribedGroupMembe
  assignment);
 }
 
+@Test
+public void testListClientMetricsResources() throws Exception {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+List expected = Arrays.asList(
+new ClientMetricsResourceListing("one"),
+new ClientMetricsResourceListing("two")
+);
+
+ListClientMetricsResourcesResponseData responseData =
+new 
ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code());
+
+responseData.clientMetricsResources()
+.add(new 
ListClientMetricsResourcesResponseData.ClientMetricsResource().setName("one"));
+responseData.clientMetricsResources()
+.add((new 
ListClientMetricsResourcesResponseData.ClientMetricsResource()).setName("two"));
+
+env.kafkaClient().prepareResponse(
+request -> request instanceof 
ListClientMetricsResourcesRequest,
+new ListClientMetricsResourcesResponse(responseData));
+
+ListClientMetricsResourcesResult result = 
env.adminClient().listClientMetricsResources();
+assertEquals(new HashSet<>(expected), new 
HashSet<>(result.all().get()));
+}
+}
+
+@Test
+public void testListClientMetricsResourcesNotSupported() throws Exception {

Review Comment:
   That appears true to me too, but all of the other tests in that source file 
are the same pattern. I am going for consistency here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-11-24 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1404613145


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##
@@ -170,7 +173,30 @@ public boolean isOpen() {
 
 @Override
 public synchronized byte[] get(final Bytes key) {
-return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+return get(key, Optional.empty());
+}
+
+public synchronized byte[] get(final Bytes key, final Snapshot snapshot) {
+return get(key, Optional.of(snapshot));
+}
+
+private synchronized byte[] get(final Bytes key, final Optional 
snapshot) {
+if (snapshot.isPresent()) {
+try (ReadOptions readOptions = new ReadOptions()) {
+readOptions.setSnapshot(snapshot.get());
+return physicalStore.get(prefixKeyFormatter.addPrefix(key), 
readOptions);
+}
+} else {
+return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+}
+}
+
+public Snapshot getSnapshot() {
+return physicalStore.db.getSnapshot();

Review Comment:
   > I think we should push this into `RocksDBStore` class, ie, call 
`physicalStore.getSnapshot()` here, and extend `RocksDBStore` to track all open 
snapshots (similar to how we track open iterators and release all open snapshot 
if the store is closed).
   
   Since "Snapshots do not persist across DB restarts 
([ref](https://github.com/facebook/rocksdb/wiki/Snapshot))", I think we do not 
need to track them when closing the store. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-11-24 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1404613145


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##
@@ -170,7 +173,30 @@ public boolean isOpen() {
 
 @Override
 public synchronized byte[] get(final Bytes key) {
-return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+return get(key, Optional.empty());
+}
+
+public synchronized byte[] get(final Bytes key, final Snapshot snapshot) {
+return get(key, Optional.of(snapshot));
+}
+
+private synchronized byte[] get(final Bytes key, final Optional 
snapshot) {
+if (snapshot.isPresent()) {
+try (ReadOptions readOptions = new ReadOptions()) {
+readOptions.setSnapshot(snapshot.get());
+return physicalStore.get(prefixKeyFormatter.addPrefix(key), 
readOptions);
+}
+} else {
+return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+}
+}
+
+public Snapshot getSnapshot() {
+return physicalStore.db.getSnapshot();

Review Comment:
   > I think we should push this into `RocksDBStore` class, ie, call 
`physicalStore.getSnapshot()` here, and extend `RocksDBStore` to track all open 
snapshots (similar to how we track open iterators and release all open snapshot 
if the store is closed).
   
   Since "Snapshots do not persist across DB restarts 
([ref](https://github.com/facebook/rocksdb/wiki/Snapshot))", I think we do not 
need track closing them when closing the store. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-11-24 Thread via GitHub


aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1404610275


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -263,6 +266,78 @@ public VersionedRecord get(final Bytes key, final 
long asOfTimestamp) {
 return null;
 }
 
+public VersionedRecordIterator get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+Objects.requireNonNull(key, "key cannot be null");
+validateStoreOpen();
+
+final List> queryResults = new ArrayList<>();
+
+if (toTimestamp < observedStreamTime - historyRetention) {
+// history retention exceeded. we still check the latest value 
store in case the
+// latest record version satisfies the timestamp bound, in which 
case it should
+// still be returned (i.e., the latest record version per key 
never expires).
+final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+if (rawLatestValueAndTimestamp != null) {
+final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+if (recordTimestamp <= toTimestamp) {
+// latest value satisfies timestamp bound
+queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
+}
+}
+
+// history retention has elapsed and the latest record version (if 
present) does
+// not satisfy the timestamp bound. return null for 
predictability, even if data
+// is still present in segments.
+if (queryResults.size() == 0) {
+LOG.warn("Returning null for expired get.");
+}
+return new 
VersionedRecordIteratorImpl<>(queryResults.listIterator());
+} else {
+// take a RocksDB snapshot to return the segments content at the 
query time (in order to guarantee consistency)
+final Snapshot snapshot = latestValueStore.getSnapshot();
+// first check the latest value store
+final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key, snapshot);
+if (rawLatestValueAndTimestamp != null) {
+final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+if (recordTimestamp <= toTimestamp) {
+queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
+}
+}
+
+// check segment stores
+// consider the search lower bound as -INF (LONG.MIN_VALUE) to 
find the record that has been inserted before the {@code fromTimestamp}
+// but is still valid in query specified time interval.
+final List segments = 
segmentStores.segments(Long.MIN_VALUE, toTimestamp, false);
+for (final LogicalKeyValueSegment segment : segments) {
+final byte[] rawSegmentValue = segment.get(key, snapshot);

Review Comment:
   > I don't think we can pass the `snapshot` we got from latestValueStore into 
segmentStore -- it's two independent RocksDBs.
   
   Seems like they are the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Query before creating the internal remote log metadata topic [kafka]

2023-11-24 Thread via GitHub


apoorvmittal10 commented on code in PR #14755:
URL: https://github.com/apache/kafka/pull/14755#discussion_r1404599341


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##
@@ -66,6 +69,28 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
 return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
 }
 
+@Test
+public void testInternalTopicExists() {
+Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
+ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
+try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+String topic = 
topicBasedRlmm().config().remoteLogMetadataTopicName();
+boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, 
topic);
+Assertions.assertTrue(doesTopicExist);

Review Comment:
   Thanks for fixing this @kamalcph.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]

2023-11-24 Thread via GitHub


kamalcph commented on code in PR #14649:
URL: https://github.com/apache/kafka/pull/14649#discussion_r1404590243


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 leaderEpochCache.foreach(_.clearAndFlush())
 producerStateManager.truncateFullyAndStartAt(newOffset)
 logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
+if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(newOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)

Review Comment:
   > Adding it to truncateFullyAndStartAt means that anything that calls the 
function does not have to worry about the updating the local log start offset 
every time.
   
   My preference is to update the local-log-start-offset in 
`truncateFullyAndStartAt` function as it updates the leader-epoch-checkpoint 
file, log-start-offset, high-watermark, and log-end-offset. 
   
   To avoid passing the reason in `maybeIncrementLocalLogStartOffset` method, 
we can update the `local-log-start-offset` value directly in 
truncateFullyAndStartAt method since it does the operations by taking the same 
`lock`. 
   
   We followed a similar approach in our local branch: 
https://sourcegraph.com/github.com/satishd/kafka@24d21694f2f0450e8a8e60e85400de3586fc7069/-/blob/core/src/main/scala/kafka/log/Log.scala?L2385



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]

2023-11-24 Thread via GitHub


kamalcph commented on code in PR #14649:
URL: https://github.com/apache/kafka/pull/14649#discussion_r1404591766


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 leaderEpochCache.foreach(_.clearAndFlush())
 producerStateManager.truncateFullyAndStartAt(newOffset)
 logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
+_localLogStartOffset = newOffset

Review Comment:
   @nikramakrishnan 
   
   Can we reuse the same approach?
   
   ```
   if (remoteLogEnabled()) _localLogStartOffset = newOffset
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]

2023-11-24 Thread via GitHub


kamalcph commented on code in PR #14649:
URL: https://github.com/apache/kafka/pull/14649#discussion_r1404590243


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 leaderEpochCache.foreach(_.clearAndFlush())
 producerStateManager.truncateFullyAndStartAt(newOffset)
 logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
+if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(newOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)

Review Comment:
   > Adding it to truncateFullyAndStartAt means that anything that calls the 
function does not have to worry about the updating the local log start offset 
every time.
   
   My preference is to update the local-log-start-offset in 
`truncateFullyAndStartAt` function as it updates the leader-epoch-checkpoint 
file, log-start-offset, high-watermark, and log-end-offset. 
   
   To avoid passing the reason in maybeIncrementLocalLogStartOffset` method, we 
can update the `local-log-start-offset` value directly in 
truncateFullyAndStartAt method since it takes the same `lock`. 
   
   We followed a similar approach in our local branch: 
https://sourcegraph.com/github.com/satishd/kafka@24d21694f2f0450e8a8e60e85400de3586fc7069/-/blob/core/src/main/scala/kafka/log/Log.scala?L2385



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]

2023-11-24 Thread via GitHub


kamalcph commented on code in PR #14649:
URL: https://github.com/apache/kafka/pull/14649#discussion_r1404590243


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 leaderEpochCache.foreach(_.clearAndFlush())
 producerStateManager.truncateFullyAndStartAt(newOffset)
 logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
+if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(newOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)

Review Comment:
   > Adding it to truncateFullyAndStartAt means that anything that calls the 
function does not have to worry about the updating the local log start offset 
every time.
   
   My preference is to update the local-log-start-offset in 
`truncateFullyAndStartAt` function as it updates the leader-epoch-checkpoint 
file, log-start-offset, high-watermark, and log-end-offset. 
   
   To avoid passing the reason in `maybeIncrementLocalLogStartOffset` method, 
we can update the `local-log-start-offset` value directly in 
truncateFullyAndStartAt method since it takes the same `lock`. 
   
   We followed a similar approach in our local branch: 
https://sourcegraph.com/github.com/satishd/kafka@24d21694f2f0450e8a8e60e85400de3586fc7069/-/blob/core/src/main/scala/kafka/log/Log.scala?L2385



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]

2023-11-24 Thread via GitHub


scholzj commented on PR #14756:
URL: https://github.com/apache/kafka/pull/14756#issuecomment-1825984878

   Thanks a lot for your help with this @mimaison and @viktorsomogyi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15464) Allow dynamic reloading of certificates with different DN / SANs

2023-11-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-15464.

Fix Version/s: 3.7.0
   Resolution: Fixed

> Allow dynamic reloading of certificates with different DN / SANs
> 
>
> Key: KAFKA-15464
> URL: https://issues.apache.org/jira/browse/KAFKA-15464
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Assignee: Jakub Scholz
>Priority: Major
> Fix For: 3.7.0
>
>
> Kafka currently doesn't allow dynamic reloading of keystores when the new key 
> has a different DN or removes some of the SANs. While it might help to 
> prevent users from breaking their cluster, in some cases it would be great to 
> be able to bypass this validation when desired.
> More details are in the [KIP-978: Allow dynamic reloading of certificates 
> with different DN / 
> SANs|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429128]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Query before creating the internal remote log metadata topic [kafka]

2023-11-24 Thread via GitHub


kamalcph commented on code in PR #14755:
URL: https://github.com/apache/kafka/pull/14755#discussion_r1404579784


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##
@@ -66,6 +69,28 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
 return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
 }
 
+@Test
+public void testInternalTopicExists() {
+Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
+ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
+try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+String topic = 
topicBasedRlmm().config().remoteLogMetadataTopicName();
+boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, 
topic);
+Assertions.assertTrue(doesTopicExist);

Review Comment:
   Opened #14840 to fix this test. Ran the test locally repeatedly (100) to 
ensure that it does not fail this time:
   
   https://github.com/apache/kafka/assets/11411249/3912af3d-269f-4a34-966f-4a9096b49f30;>
   
   PTAL



##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##
@@ -66,6 +69,28 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
 return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
 }
 
+@Test
+public void testInternalTopicExists() {
+Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
+ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
+try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+String topic = 
topicBasedRlmm().config().remoteLogMetadataTopicName();
+boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, 
topic);
+Assertions.assertTrue(doesTopicExist);

Review Comment:
   Opened #14840 to fix this test. Ran the test locally repeatedly (100) to 
ensure that it does not fail this time:
   
   https://github.com/apache/kafka/assets/11411249/3912af3d-269f-4a34-966f-4a9096b49f30;>
   
   PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix the flaky TBRLMM `testInternalTopicExists` test [kafka]

2023-11-24 Thread via GitHub


kamalcph opened a new pull request, #14840:
URL: https://github.com/apache/kafka/pull/14840

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404560191


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -36,14 +36,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
-  def testSimpleConsumption(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = {
 val numRecords = 1
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   I did consider that. I have already endured the annoyance. I'll take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15892) Flaky test: testAlterSinkConnectorOffsets – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

2023-11-24 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15892:
-

 Summary: Flaky test: testAlterSinkConnectorOffsets – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
 Key: KAFKA-15892
 URL: https://issues.apache.org/jira/browse/KAFKA-15892
 Project: Kafka
  Issue Type: Bug
Reporter: Apoorv Mittal


h4. Error
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
alter connector offsets. Error response: \{"error_code":500,"message":"Failed 
to alter consumer group offsets for connector test-connector either because its 
tasks haven't stopped completely yet or the connector was resumed before the 
request to alter its offsets could be successfully completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the Connect cluster may need to be restarted to get rid of 
the zombie sink tasks."}
h4. Stacktrace
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
alter connector offsets. Error response: \{"error_code":500,"message":"Failed 
to alter consumer group offsets for connector test-connector either because its 
tasks haven't stopped completely yet or the connector was resumed before the 
request to alter its offsets could be successfully completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the Connect cluster may need to be restarted to get rid of 
the zombie sink tasks."}
 at 
app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.alterConnectorOffsets(EmbeddedConnect.java:614)
 at 
app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.alterConnectorOffsets(EmbeddedConnectCluster.java:48)
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:363)
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsets(OffsetsApiIntegrationTest.java:287)
 at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
 at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
 at 
app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at 
app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
 at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
 at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
 at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
 at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
 at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15891) Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

2023-11-24 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15891:
-

 Summary: Flaky test: 
testResetSinkConnectorOffsetsOverriddenConsumerGroupId – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
 Key: KAFKA-15891
 URL: https://issues.apache.org/jira/browse/KAFKA-15891
 Project: Kafka
  Issue Type: Bug
Reporter: Apoorv Mittal


h4. Error
org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
Sink connector consumer group offsets should catch up to the topic end offsets 
==> expected:  but was: 
h4. Stacktrace
org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
Sink connector consumer group offsets should catch up to the topic end offsets 
==> expected:  but was: 
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
 at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
 at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
 at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
 at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
 at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302)
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917)
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.resetAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:725)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Query before creating the internal remote log metadata topic [kafka]

2023-11-24 Thread via GitHub


apoorvmittal10 commented on code in PR #14755:
URL: https://github.com/apache/kafka/pull/14755#discussion_r1404552551


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##
@@ -66,6 +69,28 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
 return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
 }
 
+@Test
+public void testInternalTopicExists() {
+Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
+ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
+try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+String topic = 
topicBasedRlmm().config().remoteLogMetadataTopicName();
+boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, 
topic);
+Assertions.assertTrue(doesTopicExist);

Review Comment:
   @kamalcph Can we please verify the flakiness of the test introduced in this 
PR. I can find the test failing intermittently on PR builds, PR build run which 
reports this test failing: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14632/26/tests/
   
   I have tried to reproduce locally and added `@RepeatedTest(100)` for the 
test and can reproduce. Can we please either revert this test or fix it.
   
   https://github.com/apache/kafka/assets/2861565/105ae973-7b8e-4f40-9e6a-d34320eb9347;>
   
   
   cc: @satishd
   
   @junrao - Found this while investigating build test failures for PRs in 
review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-24 Thread via GitHub


apoorvmittal10 commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1825923669

   New failing - 12
   Build / JDK 11 and Scala 2.13 / testAlterSinkConnectorOffsets – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   12s
   Build / JDK 11 and Scala 2.13 / 
testResetSinkConnectorOffsetsOverriddenConsumerGroupId – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   51s
   Build / JDK 11 and Scala 2.13 / testResetSinkConnectorOffsetsZombieSinkTasks 
– org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   58s
   Build / JDK 21 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   1m 50s
   Build / JDK 21 and Scala 2.13 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   1m 42s
   Build / JDK 21 and Scala 2.13 / 
shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
 – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest
   1m 9s
   Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 8 and Scala 2.12 / testConsumptionWithBrokerFailures() – 
kafka.api.ConsumerBounceTest
   54s
   Build / JDK 8 and Scala 2.12 / testAlwaysSendsAccumulatedOfflineDirs() – 
kafka.server.BrokerLifecycleManagerTest
   <1s
   Build / JDK 8 and Scala 2.12 / testInternalTopicExists() – 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest
   12s
   Build / JDK 8 and Scala 2.12 / 
testListTopicsWithExcludeInternal(String).quorum=zk – 
org.apache.kafka.tools.TopicCommandIntegrationTest: 
https://issues.apache.org/jira/browse/KAFKA-15140


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


dajac commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404537279


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -36,14 +36,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
-  def testSimpleConsumption(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = {
 val numRecords = 1
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   Adding this change to all the tests is a bit annoying. Have you considered 
adding this to `IntegrationTestHarness.doSetup` or in `createConsumer`? We 
could infer it like we did with `isNewGroupCoordinatorEnabled()` in the same 
class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add ProcessingThread tag interface [kafka]

2023-11-24 Thread via GitHub


nicktelford commented on PR #14839:
URL: https://github.com/apache/kafka/pull/14839#issuecomment-1825885812

   This is part of KIP-892, and has been broken out into a separate PR to 
reduce the review burden on the main KIP-892 implementation, since it can be 
merged independently.
   
   Note: KIP-892 requires this, because under `READ_COMMITTED`, `RocksDBStore` 
needs to know whether to return records from the current transaction buffer 
(for processing threads), or whether to return only records committed to the 
underlying RocksDB database (interactive query threads).
   
   Previously, this was done with an `instanceof StreamThread` check, but with 
the experimental `TaskExecutorThread` implementation in `trunk`, we need some 
way to abstract this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14412: Add ProcessingThread tag interface [kafka]

2023-11-24 Thread via GitHub


nicktelford opened a new pull request, #14839:
URL: https://github.com/apache/kafka/pull/14839

   This interface provides a common supertype for `StreamThread` and 
`DefaultTaskExecutor.TaskExecutorThread`, which will be used by KIP-892 to 
differentiate between "processing" threads and interactive query threads.
   
   This is needed because `DefaultTaskExecutor.TaskExecutorThread` is 
`private`, so cannot be seen directly from `RocksDBStore`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-11-24 Thread via GitHub


soarez commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1825859013

   @cmccabe @pprovenzano @rondagostino PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-11-24 Thread via GitHub


soarez commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1825858721

   This builds on #14820  - KAFKA-15886: Always specify directories for new 
partition registrations – so this is marked as draft until #14820 is merged.
   
   **Reviews**: please focus on the second commit, titled "[KAFKA-15361: 
Process and persist dir info with broker 
registration](https://github.com/apache/kafka/pull/14838/commits/aa945193cc705df6e6e904b5ab8a9ecfeca4a38b)"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-11-24 Thread via GitHub


soarez opened a new pull request, #14838:
URL: https://github.com/apache/kafka/pull/14838

   Controllers should process and persist directory information from the broker 
registration request
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]

2023-11-24 Thread via GitHub


viktorsomogyi merged PR #14756:
URL: https://github.com/apache/kafka/pull/14756


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]

2023-11-24 Thread via GitHub


viktorsomogyi commented on PR #14756:
URL: https://github.com/apache/kafka/pull/14756#issuecomment-1825833128

   There were failing unit tests but rerunning them locally from the PR they 
all passed, therefore they're likely unrelated flakies. Merging the PR, thanks 
@scholzj for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15801: improve Kafka broker/NetworkClient logging for connectiv… [kafka]

2023-11-24 Thread via GitHub


ijuma commented on code in PR #14799:
URL: https://github.com/apache/kafka/pull/14799#discussion_r1404471013


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -855,10 +855,11 @@ private void 
handleTimedOutConnections(List responses, long now)
 List nodes = 
connectionStates.nodesWithConnectionSetupTimeout(now);
 for (String nodeId : nodes) {
 this.selector.close(nodeId);
-log.info(
-"Disconnecting from node {} due to socket connection setup 
timeout. " +
+log.warn(

Review Comment:
   Why is this a warning? It is expected to happen in a number of expected 
situations (like cluster rolls). The additional information seems useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add retry for CI [kafka]

2023-11-24 Thread via GitHub


ijuma commented on PR #14828:
URL: https://github.com/apache/kafka/pull/14828#issuecomment-1825825013

   Thanks for the PR. Note that The CI configuration already sets retries, we 
don't need a CI server configuration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Always send cumulative failed dirs in HB request [kafka]

2023-11-24 Thread via GitHub


apoorvmittal10 commented on code in PR #14770:
URL: https://github.com/apache/kafka/pull/14770#discussion_r1404379306


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -207,34 +207,35 @@ class BrokerLifecycleManagerTest {
   }
 
   @Test
-  def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = {
+  def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
 val ctx = new RegistrationTestContext(configProperties)
 val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", isZkBroker = false)
 val controllerNode = new Node(3000, "localhost", 8021)
 ctx.controllerNodeProvider.node.set(controllerNode)
 
 val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()
-  .setErrorCode(Errors.NOT_CONTROLLER.code(
-val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
-val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, 
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
 
-val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), 
Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
-offlineDirs.foreach(manager.propagateDirectoryFailure)
-
-// start the manager late to prevent a race, and force expectations on the 
first heartbeat
 manager.start(() => ctx.highestMetadataOffset.get(),
   ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.empty())
-
 poll(ctx, manager, registration)
-val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs()
-val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs()
-val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs()
 
-assertEquals(offlineDirs, dirs1.asScala.toSet)
-assertEquals(offlineDirs, dirs2.asScala.toSet)
-assertEquals(Set.empty, dirs3.asScala.toSet)
+
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+poll(ctx, manager, heartbeats(0)).data()
+val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
+
+
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+poll(ctx, manager, heartbeats(2)).data()
+val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
+
+
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
+poll(ctx, manager, heartbeats(4)).data()
+val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()

Review Comment:
   Thanks for fixing this @soarez.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15154: Acquire lock when reading checkQuotas [kafka]

2023-11-24 Thread via GitHub


mstepan commented on PR #13969:
URL: https://github.com/apache/kafka/pull/13969#issuecomment-1825678364

   I would suggest using StampedLock instead of just fully synchronized keyword 
here.
   Something like  https://github.com/apache/kafka/pull/14837/commits 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15154) Potential bug: We don't acquire lock when reading checkQuotas

2023-11-24 Thread Maksym Stepanenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789452#comment-17789452
 ] 

Maksym Stepanenko commented on KAFKA-15154:
---

I believe instead of just adding a synchronization block it would be much 
better to introduce something like a read-write lock (or StanpedLock ). 
Like this: https://github.com/apache/kafka/pull/14837/commits

> Potential bug: We don't acquire lock when reading checkQuotas
> -
>
> Key: KAFKA-15154
> URL: https://issues.apache.org/jira/browse/KAFKA-15154
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Major
>  Labels: newbie
>
> At sensor.java line:254, we call `this.metrics.values()`. metrics is not a 
> thread safe map and that is why we acquire a lock whenever we want to 
> add/remove entries from it. For example, see add(), hasMetrics() method.
> However, we don't acquire a lock when calling Sensor#checkQuotas(timeMs).
> This could lead to a situation where this metrics map may be left in an 
> inconsistent state (since it is not thread safe for concurrent read/write 
> access).
> The objective of this task is to validate what I said above is correct and if 
> yes, then fix the situation by enclosing this read in a lock. As a stretch 
> task, we should consider if we can replace the metrics data structure which 
> allows concurrent reads but exclusive writes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15154: proper StampedLock synchronization added [kafka]

2023-11-24 Thread via GitHub


mstepan opened a new pull request, #14837:
URL: https://github.com/apache/kafka/pull/14837

   All read and write access to Sensor metrics field should be properly 
synchronized according to JMM.
   The best way to separate read and write is to use ReadWriteLock or more 
advanced version of StampedLock.
   
   All tests are GREEN.
   
   ### Committer Checklist (excluded from commit message)
   - [v] Verify design and implementation 
   - [v] Verify test coverage and CI build status
   - [v] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Always send cumulative failed dirs in HB request [kafka]

2023-11-24 Thread via GitHub


soarez commented on code in PR #14770:
URL: https://github.com/apache/kafka/pull/14770#discussion_r1404314260


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -207,34 +207,35 @@ class BrokerLifecycleManagerTest {
   }
 
   @Test
-  def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = {
+  def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
 val ctx = new RegistrationTestContext(configProperties)
 val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", isZkBroker = false)
 val controllerNode = new Node(3000, "localhost", 8021)
 ctx.controllerNodeProvider.node.set(controllerNode)
 
 val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()
-  .setErrorCode(Errors.NOT_CONTROLLER.code(
-val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
-val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, 
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
 
-val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), 
Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
-offlineDirs.foreach(manager.propagateDirectoryFailure)
-
-// start the manager late to prevent a race, and force expectations on the 
first heartbeat
 manager.start(() => ctx.highestMetadataOffset.get(),
   ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.empty())
-
 poll(ctx, manager, registration)
-val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs()
-val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs()
-val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs()
 
-assertEquals(offlineDirs, dirs1.asScala.toSet)
-assertEquals(offlineDirs, dirs2.asScala.toSet)
-assertEquals(Set.empty, dirs3.asScala.toSet)
+
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+poll(ctx, manager, heartbeats(0)).data()
+val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
+
+
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+poll(ctx, manager, heartbeats(2)).data()
+val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
+
+
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
+poll(ctx, manager, heartbeats(4)).data()
+val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()

Review Comment:
   Thanks for reporting this Apoorv. Please see #14836



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-24 Thread via GitHub


soarez commented on PR #14836:
URL: https://github.com/apache/kafka/pull/14836#issuecomment-1825622434

   Flakiness detected since #14770
   
   cc @apoorvmittal10 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]

2023-11-24 Thread via GitHub


soarez opened a new pull request, #14836:
URL: https://github.com/apache/kafka/pull/14836

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-24 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1404286606


##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -386,7 +388,9 @@ public enum Errors {
 STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry 
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", 
StaleMemberEpochException::new),
 MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the 
wrong type.", MismatchedEndpointTypeException::new),
 UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", 
UnsupportedEndpointTypeException::new),
-UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new);
+UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new),
+UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an 
invalid or outdated subscription ID", UnknownSubscriptionIdException::new),

Review Comment:
   Done, thanks for pointing out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield opened a new pull request, #14835:
URL: https://github.com/apache/kafka/pull/14835

   In the new consumer, `Consumer.poll(Duration timeout)` blocks for the entire 
duration. If the consumer is joining a group and has not yet received its 
assignments, the poll begins before an assignment has yet been received. 
Because the poll is blocked, it does not notice when partitions are assigned, 
and it subsequently does not return any records. The old consumer only blocks 
for the duration of the heartbeat interval and loops for until the poll timeout 
has passed, and is thus able to check for assignments received.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1404253943


##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -386,7 +388,9 @@ public enum Errors {
 STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry 
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", 
StaleMemberEpochException::new),
 MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the 
wrong type.", MismatchedEndpointTypeException::new),
 UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", 
UnsupportedEndpointTypeException::new),
-UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new);
+UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new),
+UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an 
invalid or outdated subscription ID", UnknownSubscriptionIdException::new),

Review Comment:
   Tiny change request. Please put a full stop at the end of the error strings. 
`"Client sent a push telemetry request with an invalid or outdated subscription 
ID."`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404233938


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends 
AbstractConsumerTest {
 }
 
 object BaseConsumerTest {
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"),
+Arguments.of("kraft+kip848", "consumer"))
+  }
+
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  def getTestQuorumAndGroupProtocolParametersZkOnly() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"))
+  }
+
+  // For tests that only work with the generic group protocol, we want to test 
the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  def getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"))

Review Comment:
   OK. Nice and easy to change now I've refactored it. I'll get on it :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


dajac commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404226250


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends 
AbstractConsumerTest {
 }
 
 object BaseConsumerTest {
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"),
+Arguments.of("kraft+kip848", "consumer"))
+  }
+
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  def getTestQuorumAndGroupProtocolParametersZkOnly() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"))
+  }
+
+  // For tests that only work with the generic group protocol, we want to test 
the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  def getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"))

Review Comment:
   We likely need it here too.



##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends 
AbstractConsumerTest {
 }
 
 object BaseConsumerTest {
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"),
+Arguments.of("kraft+kip848", "consumer"))

Review Comment:
   We also need to test the `generic` with `kraft+kip848`. This is what all the 
tests with `kraft+kip848` prior to your change did.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1825499807

   This PR now reflects the changes in KAFKA-14781 and also tests the new 
consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404213789


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitAsync(groupProtocol: String): Unit = {
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props)
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+val cb = new CountConsumerCommitCallback
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.assign(List(tp).asJava)
+consumer.commitAsync(cb)
+TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || 
cb.lastError.isDefined,
+  "Failed to observe commit callback before timeout", waitTimeMs = 1)
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+// No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+// tp. The committed offset should be null. This is intentional.
+assertNull(committedOffset.get(tp))
+assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSync(groupProtocol: String): Unit = {
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props)
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.assign(List(tp).asJava)
+consumer.commitSync()
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+// No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+// tp. The committed offset should be null. This is intentional.
+assertNull(committedOffset.get(tp))
+assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = {
+val numRecords = 1
+
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props)
+consumer.assign(List(tp).asJava)
+consumer.seek(tp, 0)
+consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+consumer.commitSync()
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+assertNotNull(committedOffset.get(tp))
+assertEquals(numRecords, committedOffset.get(tp).offset())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsume(groupProtocol: String): Unit = {
+val numRecords = 10
+
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props,
+  configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+consumer.assign(List(tp).asJava)
+consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+assertEquals(numRecords, consumer.position(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = {
+val numRecords = 10
+
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props,
+  configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+consumer.assign(List(tp).asJava)
+val offset = 1
+consumer.seek(tp, offset)
+consumeAndVerifyRecords(consumer = consumer, numRecords - offset, 
startingOffset = 

Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404208505


##
core/src/test/scala/kafka/utils/TestInfoUtils.scala:
##
@@ -39,6 +39,12 @@ object TestInfoUtils {
   } else {
 throw new RuntimeException(s"Unknown quorum value")
   }
+} else if (testInfo.getDisplayName().contains("groupProtocol=")) {

Review Comment:
   I've replaced this in the new commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404207856


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -34,13 +35,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @Test
-  def testSimpleConsumption(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))

Review Comment:
   This has been replaced with a `MethodSource` that is capable of returning 
whatever combination we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-24 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1404037483


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -691,6 +708,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
 if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
 }
+} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+throwIfNull(request.instanceId(), "InstanceId can't be null for 
Static Member. GroupId: "

Review Comment:
   I would rather use `InstanceId can't be null.` here in order to be 
consistent with the other error messages.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -751,6 +772,53 @@ private void throwIfConsumerGroupIsFull(
 }
 }
 
+/**
+ * Validates and throws an error when the validation fails for static 
member.
+ * @param groupId The group id
+ * @param instanceId  The instance id
+ * @param member  The existing static member in the group.
+ * @param memberEpoch The member epoch with which the static member sends 
heartbeat.
+ * @param memberIdThe member id with which the member joins now.
+ *
+ * @throws UnknownMemberIdException if member sends heartbeat with a 
non-zero epoch and no static member exists for
+ * the instance id.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException If 
member joins with non-zero epoch but there
+ * already exists a static member with a different memberId.
+ * @throws org.apache.kafka.common.errors.UnreleasedInstanceIdException A 
new member is trying to leave the group
+ * but the existing static member hasn't requested leaving the group.
+
+ */
+private void throwIfStaticMemberValidationFails(

Review Comment:
   I suppose that we could remove this one now. Could we?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -849,21 +922,53 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+if (memberEpoch == 0) {
+// A new static member joins or the existing static member 
rejoins.
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+} else {
+// Static member rejoins with a different member id so it 
should replace
+// the previous instance iff the previous member had sent 
a Leave group.
+throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+// Replace the current member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+}
+} else {
+throwIfStaticMemberIsUnknown(member, instanceId);
+throwIfInstanceIdIsFenced(memberId, instanceId, member);

Review Comment:
   nit: I would put `member` as the first argument to be consistent with the 
other helpers.



##

Re: [PR] KAFKA-15803: Update leader epoch in commitAsync and committed [kafka]

2023-11-24 Thread via GitHub


lucasbru merged PR #14817:
URL: https://github.com/apache/kafka/pull/14817


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15803: Update leader epoch in commitAsync and committed [kafka]

2023-11-24 Thread via GitHub


lucasbru commented on PR #14817:
URL: https://github.com/apache/kafka/pull/14817#issuecomment-1825403240

   Test failures unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13531: Disable flaky NamedTopologyIntegrationTest [kafka]

2023-11-24 Thread via GitHub


lucasbru commented on PR #14830:
URL: https://github.com/apache/kafka/pull/14830#issuecomment-1825399087

   Test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13531: Disable flaky NamedTopologyIntegrationTest [kafka]

2023-11-24 Thread via GitHub


lucasbru merged PR #14830:
URL: https://github.com/apache/kafka/pull/14830


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available

2023-11-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-15817:
---
Fix Version/s: (was: 3.6.1)

> Avoid reconnecting to the same IP address if multiple addresses are available
> -
>
> Key: KAFKA-15817
> URL: https://issues.apache.org/jira/browse/KAFKA-15817
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>Reporter: Bob Barrett
>Assignee: Bob Barrett
>Priority: Major
> Fix For: 3.7.0
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS 
> resolution behavior for clients to re-resolve DNS after disconnecting from a 
> broker, rather than wait until we iterated over all addresses from a given 
> resolution. This is useful when the IP addresses have changed between the 
> connection and disconnection.
> However, with the behavior change, this does mean that clients could 
> potentially reconnect immediately to the same IP they just disconnected from, 
> if the IPs have not changed. In cases where the disconnection happened 
> because that IP was unhealthy (such as a case where a load balancer has 
> instances in multiple availability zones and one zone is unhealthy, or a case 
> where an intermediate component in the network path is going through a 
> rolling restart), this will delay the client successfully reconnecting. To 
> address this, clients should remember the IP they just disconnected from and 
> skip that IP when reconnecting, as long as the address resolved to multiple 
> addresses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)