Re: [PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]

2023-11-17 Thread via GitHub


divijvaidya commented on PR #14788:
URL: https://github.com/apache/kafka/pull/14788#issuecomment-1817397316

   This is a public facing change. Is this part of a KIP?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-17 Thread via GitHub


philipnee commented on PR #14680:
URL: https://github.com/apache/kafka/pull/14680#issuecomment-1817393641

   Thanks @lucasbru 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-17 Thread via GitHub


hanyuzheng7 commented on PR #14570:
URL: https://github.com/apache/kafka/pull/14570#issuecomment-1817349035

   @mjsax I have already updated the javadoc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-17 Thread via GitHub


jolshan commented on code in PR #14789:
URL: https://github.com/apache/kafka/pull/14789#discussion_r1398026217


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() {
 assertEquals(5, recordsToTest.get(1).offset());
 }
 
+/**
+ * KAFKA-15836:
+ * Test that max.poll.records is honoured when consuming from multiple 
topic-partitions and the
+ * fetched records are not aligned on max.poll.records boundaries.
+ *
+ * tp0 has records 1,2,3; tp1 has records 6,7,8
+ * max.poll.records is 2
+ * 
+ * poll 1 should return 1,2
+ * poll 2 should return 3,6
+ * poll 3 should return 7,8
+ * 
+ * Or similar :)
+ */
+@Test
+public void testFetchMaxPollRecordsUnaligned() {
+buildFetcher(2);
+
+Set tps = new HashSet<>();
+tps.add(tp0);
+tps.add(tp1);
+assignFromUser(tps);
+subscriptions.seek(tp0, 1);
+subscriptions.seek(tp1, 6);
+
+client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, 
moreRecords, 100L));
+client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, 
Errors.NONE, 100L, 0));
+
+assertEquals(1, sendFetches());
+consumerClient.poll(time.timer(0));

Review Comment:
   It's a little confusing to reuse the recordsByPartition, fetchedRecords 
values on each call. I wonder if there is a way to modularize or make it 
clearer things are being reset and avoid code duplication.
   
   I'm also wondering if we could have comments on each of these cases for what 
we expect to be happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-17 Thread via GitHub


jolshan commented on code in PR #14789:
URL: https://github.com/apache/kafka/pull/14789#discussion_r1398026217


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() {
 assertEquals(5, recordsToTest.get(1).offset());
 }
 
+/**
+ * KAFKA-15836:
+ * Test that max.poll.records is honoured when consuming from multiple 
topic-partitions and the
+ * fetched records are not aligned on max.poll.records boundaries.
+ *
+ * tp0 has records 1,2,3; tp1 has records 6,7,8
+ * max.poll.records is 2
+ * 
+ * poll 1 should return 1,2
+ * poll 2 should return 3,6
+ * poll 3 should return 7,8
+ * 
+ * Or similar :)
+ */
+@Test
+public void testFetchMaxPollRecordsUnaligned() {
+buildFetcher(2);
+
+Set tps = new HashSet<>();
+tps.add(tp0);
+tps.add(tp1);
+assignFromUser(tps);
+subscriptions.seek(tp0, 1);
+subscriptions.seek(tp1, 6);
+
+client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, 
moreRecords, 100L));
+client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, 
Errors.NONE, 100L, 0));
+
+assertEquals(1, sendFetches());
+consumerClient.poll(time.timer(0));

Review Comment:
   It's a little confusing to reuse the recordsByPartition, fetchedRecords 
values on each call. I wonder if there is a way to modularize or make it 
clearer things are being reset.
   
   I'm also wondering if we could have comments on each of these cases for what 
we expect to be happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-17 Thread via GitHub


jolshan commented on code in PR #14789:
URL: https://github.com/apache/kafka/pull/14789#discussion_r1398026217


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() {
 assertEquals(5, recordsToTest.get(1).offset());
 }
 
+/**
+ * KAFKA-15836:
+ * Test that max.poll.records is honoured when consuming from multiple 
topic-partitions and the
+ * fetched records are not aligned on max.poll.records boundaries.
+ *
+ * tp0 has records 1,2,3; tp1 has records 6,7,8
+ * max.poll.records is 2
+ * 
+ * poll 1 should return 1,2
+ * poll 2 should return 3,6
+ * poll 3 should return 7,8
+ * 
+ * Or similar :)
+ */
+@Test
+public void testFetchMaxPollRecordsUnaligned() {
+buildFetcher(2);
+
+Set tps = new HashSet<>();
+tps.add(tp0);
+tps.add(tp1);
+assignFromUser(tps);
+subscriptions.seek(tp0, 1);
+subscriptions.seek(tp1, 6);
+
+client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, 
moreRecords, 100L));
+client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, 
Errors.NONE, 100L, 0));
+
+assertEquals(1, sendFetches());
+consumerClient.poll(time.timer(0));

Review Comment:
   It's a little confusing to reuse the recordsByPartition, fetchedRecords 
values on each call. I wonder if there is a way to modularize or make it 
clearer things are being reset.
   
   I'm also wondering if we could have comments on each of these cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Change test logging capture to per-test, reducing jenkins truncation [kafka]

2023-11-17 Thread via GitHub


gharris1727 opened a new pull request, #14795:
URL: https://github.com/apache/kafka/pull/14795

   Jenkins truncates stdout/stderr from tests which exceed 100,000 bytes. This 
truncation is computed once per-suite, meaning that each suite gets a 100kb 
budget for logs, and suites that log too much have the middle of the log 
truncated. This unfairly discards complete logs for tests in the middle of the 
suite, while keeping logs from the beginning and end of the suite.
   
   If a failure occurs in a single test in the middle of a suite, the relevant 
logs may be completely elided, making investigation of the failure more 
difficult. This has made debugging with the CI logging almost completely 
ineffective, as the relevant logs are often swallowed by Jenkins, and 
irrelevant logs are shown.
   
   Instead, we can enable this feature in the Gradle JunitXmlReport: 
https://docs.gradle.org/current/javadoc/org/gradle/api/tasks/testing/JUnitXmlReport.html#setOutputPerTestCase-boolean-
   This changes the way that stdout/stderr is embedded in the XML report, 
separating the output for each test into different xml tags. This may enable 
Jenkins to perform truncation on a per-test basis, so that each test in a suite 
gets a fair distribution of the logging budget.
   
   This could increase the size of the logs persisted by Jenkins, as each suite 
is currently capped at 100kb, but after this change you could receive N*100kb 
logs overall, if there are N tests in the suite. However It appears that 
Jenkins cannot show the stdout for passing tests, so probably isn't capturing 
it (I couldn't find a configuration which would confirm this.) If this is true, 
that means that the size of logs persisted will only increase for test 
failures, when the additional logs would be useful.
   
   This change may also reduce the total amount of logs captured, since logs 
from tests that passed won't be kept when another test in the same suite fails. 
Regardless, the more effective usage of the logging budget will be beneficial 
even if the total amount of logs persisted increases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398012157


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
+import org.apache.kafka.streams.state.internals.ListValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
+
+public class OuterStreamJoinStoreFactory extends 
AbstractConfigurableStoreFactory {
+
+private final String name;
+private final StreamJoinedInternal streamJoined;
+private final JoinWindows windows;
+private boolean loggingEnabled;
+
+public enum Type {
+RIGHT,
+LEFT
+}
+
+public OuterStreamJoinStoreFactory(
+final String name,
+final StreamJoinedInternal streamJoined,
+final JoinWindows windows,
+final Type type
+) {
+super(streamJoined.dslStoreSuppliers());
+this.name = buildOuterJoinWindowStoreName(streamJoined, name, type) + 
"-store";
+this.streamJoined = streamJoined;
+this.windows = windows;
+this.loggingEnabled = streamJoined.loggingEnabled();
+}
+
+@Override
+public StateStore build() {
+final Duration retentionPeriod = Duration.ofMillis(retentionPeriod());
+final Duration windowSize = Duration.ofMillis(windows.size());
+final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+if (retentionMs < 0L) {
+throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+}
+if (windowSizeMs < 0L) {
+throw new IllegalArgumentException("windowSize cannot be 
negative");
+}
+if (windowSizeMs > retentionMs) {
+throw new IllegalArgumentException("The retention period of the 
window store "
++ name + " must be no smaller than its window size. Got 
size=["
++ windowSizeMs + "], retention=[" + retentionMs + "]");
+}
+
+final TimestampedKeyAndJoinSideSerde timestampedKeyAndJoinSideSerde 
= new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
+final LeftOrRightValueSerde leftOrRightValueSerde = new 
LeftOrRightValueSerde<>(streamJoined.valueSerde(), 
streamJoined.otherValueSerde());
+
+// TODO: we should allow for configuration of this store explicitly 
instead of assuming that it should
+// share the same type of store as thisStoreSupplier
+final boolean useInMemoryStore = streamJoined.thisStoreSupplier() != 
null
+&& streamJoined.thisStoreSupplier() instanceof 
InMemoryWindowBytesStoreSupplier;
+final KeyValueBytesStoreSupplier supplier = useInMe

Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398006947


##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1038,13 +1038,15 @@ public void 
shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
 assertThrows(ConfigException.class, () -> new StreamsConfig(props));
 }
 
+@SuppressWarnings("deprecation")
 @Test
 public void shouldSpecifyRocksdbWhenNotExplicitlyAddedToConfigs() {
 final String expectedDefaultStoreType = StreamsConfig.ROCKS_DB;
 final String actualDefaultStoreType = 
streamsConfig.getString(DEFAULT_DSL_STORE_CONFIG);
 assertEquals("default.dsl.store should be \"rocksDB\"", 
expectedDefaultStoreType, actualDefaultStoreType);
 }
 
+@SuppressWarnings("deprecation")
 @Test
 public void shouldSpecifyInMemoryWhenExplicitlyAddedToConfigs() {

Review Comment:
   Can you add some basic tests for the new store config? Basically something 
like what we have for the old configs --  ie the default is rocksdb and that 
the built-in InMemoryDslStoreSuppliers -- but instead of the bad-enum one, just 
make sure you can actually pass in a custom implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398005349


##
streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslWindowParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}.
+ */
+public class DslWindowParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final Duration windowSize;
+private final boolean retainDuplicates;
+private final EmitStrategy emitStrategy;
+private final boolean isSlidingWindow;
+
+/**
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod  length of time to retain data in the store 
(cannot be negative)
+ * (note that the retention period must be at 
least long enough to contain the
+ * windowed data's entire life cycle, from 
window-start through window-end,
+ * and for the entire grace period)

Review Comment:
   Actually IIUC users should never have to instantiate these classes directly, 
so I guess it's not that critical to define the specific constraints and ok to 
describe it conceptually. Or both 🤷‍♀️ 



##
streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslWindowParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}.
+ */
+public class DslWindowParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final Duration windowSize;
+private final boolean retainDuplicates;
+private final EmitStrategy emitStrategy;
+private final boolean isSlidingWindow;
+
+/**
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod  length of time to retain data in the store 
(cannot be negative)
+ * (note that the retention period must be at 
least long enough to contain the
+ * windowed data's entire life cycle, from 
window-start through window-end,
+ * and for the entire grace period)

Review Comment:
   Actually IIUC users should never have to instantiate these classes directly, 
so I guess it's not that critical to define the specific constraints and ok to 
describe it conceptually. Or both 🤷‍♀️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398004582


##
streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslWindowParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}.
+ */
+public class DslWindowParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final Duration windowSize;
+private final boolean retainDuplicates;
+private final EmitStrategy emitStrategy;
+private final boolean isSlidingWindow;
+
+/**
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod  length of time to retain data in the store 
(cannot be negative)
+ * (note that the retention period must be at 
least long enough to contain the
+ * windowed data's entire life cycle, from 
window-start through window-end,
+ * and for the entire grace period)

Review Comment:
   this is kind of a roundabout way of saying it must be at least equal to the 
windowSize + gracePeriod. I think it's better to just explicitly state this and 
put the constraint in concrete terms relative to other known quantities



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398003882


##
streams/src/main/java/org/apache/kafka/streams/state/DslStoreSuppliers.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.util.Map;
+import org.apache.kafka.common.Configurable;
+
+/**
+ * {@code DslStoreSuppliers} defines a grouping of factories to construct
+ * stores for each of the types of state store implementations in Kafka
+ * Streams. This allows configuration of a default store supplier beyond
+ * the builtin defaults of RocksDB and In-Memory.

Review Comment:
   I think we should be a bit more explicit about how this gets used, see 
https://github.com/apache/kafka/pull/14648/files#r1397992729



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398003330


##
streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslSessionParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#sessionStore(DslSessionParams)}.
+ */
+public class DslSessionParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final EmitStrategy emitStrategy;
+
+/**
+ * @param name  name of the store (cannot be {@code null})
+ * @param retentionPeriod   length of time to retain data in the store 
(cannot be negative)
+ *  (note that the retention period must be at 
least as long enough to
+ *  contain the inactivity gap of the session and 
the entire grace period.)
+ * @param emitStrategy  defines how to emit results
+ */
+public DslSessionParams(
+final String name,
+final Duration retentionPeriod,
+final EmitStrategy emitStrategy
+) {
+Objects.requireNonNull(name);
+this.name = name;
+this.retentionPeriod = retentionPeriod;
+this.emitStrategy = emitStrategy;
+}
+
+public String name() {
+return name;
+}
+
+public Duration retentionPeriod() {
+return retentionPeriod;
+}
+
+public EmitStrategy emitStrategy() {
+return emitStrategy;
+}
+
+@Override
+public boolean equals(final Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+final DslSessionParams that = (DslSessionParams) o;
+return Objects.equals(name, that.name)
+&& Objects.equals(retentionPeriod, that.retentionPeriod)
+&& Objects.equals(emitStrategy, that.emitStrategy);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(name, retentionPeriod, emitStrategy);
+}
+
+@Override
+public String toString() {
+return "DslSessionParams{" +
+"name='" + name + '\'' +

Review Comment:
   Actually looks like we have it in the KV and Window params as well. If it's 
intentional it should probably be on every line, but I'm guessing it's not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398002738


##
streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslSessionParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#sessionStore(DslSessionParams)}.
+ */
+public class DslSessionParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final EmitStrategy emitStrategy;
+
+/**
+ * @param name  name of the store (cannot be {@code null})
+ * @param retentionPeriod   length of time to retain data in the store 
(cannot be negative)
+ *  (note that the retention period must be at 
least as long enough to
+ *  contain the inactivity gap of the session and 
the entire grace period.)
+ * @param emitStrategy  defines how to emit results
+ */
+public DslSessionParams(
+final String name,
+final Duration retentionPeriod,
+final EmitStrategy emitStrategy
+) {
+Objects.requireNonNull(name);
+this.name = name;
+this.retentionPeriod = retentionPeriod;
+this.emitStrategy = emitStrategy;
+}
+
+public String name() {
+return name;
+}
+
+public Duration retentionPeriod() {
+return retentionPeriod;
+}
+
+public EmitStrategy emitStrategy() {
+return emitStrategy;
+}
+
+@Override
+public boolean equals(final Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+final DslSessionParams that = (DslSessionParams) o;
+return Objects.equals(name, that.name)
+&& Objects.equals(retentionPeriod, that.retentionPeriod)
+&& Objects.equals(emitStrategy, that.emitStrategy);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(name, retentionPeriod, emitStrategy);
+}
+
+@Override
+public String toString() {
+return "DslSessionParams{" +
+"name='" + name + '\'' +

Review Comment:
   what's with the `+ '\''` (guessing a copy/paste error?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398002400


##
streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
+
+public class BuiltInDslStoreSuppliers {

Review Comment:
   Actually maybe the stuff I listed above should be in the `DslStoreSuppliers` 
interface javadocs instead of here. But I still think this class should have 
javadocs, even just a sentence or two



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1397992729


##
streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
+
+public class BuiltInDslStoreSuppliers {

Review Comment:
   I think we should probably have javadocs for this class and the two built-in 
implementations. They can be brief but should cover at least how/where to use 
them:
   1. how to configure an app-wide global default (including the config name)
   2. how to configure individual operators via Materialized and StreamJoined
   3. the hierarchy of preference if specified in multiple places (ie 
StreamJoined/Materialized overrides configured global default overrides 
unconfigured Streams default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1397987652


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.DslWindowParams;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+public class StreamJoinedStoreFactory extends 
AbstractConfigurableStoreFactory {
+
+private final String name;
+private final JoinWindows windows;
+private final Serde valueSerde;
+private final WindowBytesStoreSupplier storeSupplier;
+private final StreamJoinedInternal joinedInternal;
+
+private boolean loggingEnabled;
+private final Map logConfig;
+
+public enum Type {
+THIS,
+OTHER
+}
+
+public StreamJoinedStoreFactory(
+final String name,
+final JoinWindows windows,
+final StreamJoinedInternal joinedInternal,
+final Type type
+) {
+super(joinedInternal.dslStoreSuppliers());
+this.name = name + "-store";
+this.joinedInternal = joinedInternal;
+this.windows = windows;
+this.loggingEnabled = joinedInternal.loggingEnabled();
+this.logConfig = new HashMap<>(joinedInternal.logConfig());
+
+// since this store is configured to retain duplicates, we should
+// not compact, so we override the configuration to make sure that
+// it's just delete (window stores are configured to compact,delete)
+this.logConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);

Review Comment:
   I guess it would be nice to have a test for this logic but honestly that 
could probably be saved for the followup work mentioned in that TODO which 
would be a more involved refactoring of the changelog topic configuration.
   
   (Haven't gotten to the tests yet so maybe you already did this)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-17 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1397982732


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java:
##
@@ -66,6 +66,10 @@ default void configure(final StreamsConfig config) {
 
 boolean isVersionedStore();
 
+// TODO: consider moving all the log configuration code 
(InternalTopicConfig)

Review Comment:
   this would be a great cleanup, I'm definitely not a fan of the 
`#isWindowStore` and `retentionPeriod` logic which kind of breaks the 
abstraction. 
   
   Just wondering, would we also be able to get rid of the `#historyRetention` 
and `#isVersionedStore` or is that used for something else?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
+import org.apache.kafka.streams.state.internals.ListValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
+
+public class OuterStreamJoinStoreFactory extends 
AbstractConfigurableStoreFactory {
+
+private final String name;
+private final StreamJoinedInternal streamJoined;
+private final JoinWindows windows;
+private boolean loggingEnabled;
+
+public enum Type {
+RIGHT,
+LEFT
+}
+
+public OuterStreamJoinStoreFactory(
+final String name,
+final StreamJoinedInternal streamJoined,
+final JoinWindows windows,
+final Type type
+) {
+super(streamJoined.dslStoreSuppliers());
+this.name = buildOuterJoinWindowStoreName(streamJoined, name, type) + 
"-store";
+this.streamJoined = streamJoined;
+this.windows = windows;
+this.loggingEnabled = streamJoined.loggingEnabled();
+}
+
+@Override
+public StateStore build() {
+final Duration retentionPeriod = Duration.ofMillis(retentionPeriod());
+final Duration windowSize = Duration.ofMillis(windows.size());
+final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+if (retentionMs < 0L) {
+throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+}
+if (windowSizeMs < 0L) {
+throw new IllegalArgumentException("windowSize cannot be 
negative");
+}
+if (windowSizeMs > retentionMs) {
+throw new IllegalArgumentException("The retention period of the 
window store "
++ name + " must be no smaller than its window size. Got 
size=["
++ windowSizeMs + "], retention=[" + retentionMs + "]");
+}
+
+final TimestampedKeyAndJoinSideSerde timestampedKeyAndJoinSideSerde 
= new TimestampedKe

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-17 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1397923196


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value, but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 metrics: a rate and 
a count. For eg:
+ * org.apache.kafka.common.network.Selector has Meter metric for 
"connection-close" but it has to be
+ * created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total.
+ *
+ * 
+ *
+ * Frequencies is created with an array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that 
it is compose

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397751986


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
+
+private ClientMetricsManager() {
+this(Time.SYSTEM);
+}
+
+// Visible for testing
+ClientMetricsManager(Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.time = time;
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = time.milliseconds();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issues another get
+ telemetry request prior to push interval, then the client should get 
a throttle error but if
+ the subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = time.milliseconds();
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-17 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1397895468


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value, but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 metrics: a rate and 
a count. For eg:
+ * org.apache.kafka.common.network.Selector has Meter metric for 
"connection-close" but it has to be
+ * created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total.
+ *
+ * 
+ *
+ * Frequencies is created with an array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that 
it is compose

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-17 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1397895468


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value, but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 metrics: a rate and 
a count. For eg:
+ * org.apache.kafka.common.network.Selector has Meter metric for 
"connection-close" but it has to be
+ * created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total.
+ *
+ * 
+ *
+ * Frequencies is created with an array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that 
it is compose

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1817091571

   Thanks @junrao for leaving the comments, I have tried to address them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397875576


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), 
instanceMetadata, 0, 0,
+null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+}
+
+@Test
+public void testMaybeUpdateRequestEpochValid() {
+// First request should be accepted.
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));

Review Comment:
   As the class/methods I am testing do not have `Time` reference rather just 
tries to updates the epoch supplied hence I didn't see any value in using 
MockTime. However I did change the code and tests for ClientMetricsManager 
where these methods are invoked from, I started using Time in 
ClientMetricsManager and corresponding MockTime in ClientMetricsManagerTest 
which eliminated the use of `Thread.sleep`.
   
   Please let me know if I am missing anything here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397873419


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   I have moved the `ClientMetricsManager` static instance and removed the code 
from ControllerServer.scala, now initialization only happens in 
`BrokerServer.scala`. Please let me know your thought for 
`ClientMetricsReceiverPlugin`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397871398


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   I have removed the conditional llogic in this class and set `0` as the 
throttle time in case of error in ClientMetricsManager. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]

2023-11-17 Thread via GitHub


soarez commented on PR #14794:
URL: https://github.com/apache/kafka/pull/14794#issuecomment-1817065152

   @dajac please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]

2023-11-17 Thread via GitHub


soarez opened a new pull request, #14794:
URL: https://github.com/apache/kafka/pull/14794

   Building AssignReplicasToDirsRequestData relies on iteration over Map 
entries, which can result in different sorting order. The order does not matter 
to the semantics of the request, but it can cause issues with test assertions.
   
   This issue was introduced in #14369
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-17 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1397839267


##
streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java:
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+
+import java.util.Optional;
+
+/**
+ * Interactive query for issuing range queries and scans over Timestamped
+ * KeyValue stores.

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   I noticed that sometimes we do set `throttleTimeMs` with an error. However, 
this only happens in KafkaApis with the generic request throttling logic. So, 
for now, I'd recommend that we get rid of this check here and rely on the 
caller to set `throttleTimeMs` properly. Specifically, for all 
`errorResponse(int throttleTimeMs, Errors errors)` and `getErrorResponse(int 
throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass 
in 0 for `throttleTimeMs`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   I noticed that sometimes we do set `throttleTimeMs` with an error. However, 
this only happens in KafkaApis with the generic request throttling logic. So, 
for now, I'd recommend that we get rid of this check here and rely on the 
caller to set `throttleTimeMs` properly. Specifically, for all 
`errorResponse(int throttleTimeMs, Errors errors) `and `getErrorResponse(int 
throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass 
in 0 for `throttleTimeMs`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   As discussed earlier, for `THROTTLING_QUOTA_EXCEEDED`, we shouldn't set  
`throttleTimeMs` since we don't want the client to mute the channel for all 
requests.
   
   Also, I noticed that sometimes we do set `throttleTimeMs` with an error. 
However, this only happens in KafkaApis with the generic request throttling 
logic. So, for now, I'd recommend that we get rid of this check here and rely 
on the caller to set `throttleTimeMs` properly. For example, for all 
`errorResponse(int throttleTimeMs, Errors errors) `and `getErrorResponse(int 
throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass 
in 0 for `throttleTimeMs`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397795152


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   Yeah, I re-checked the code today and we shouldn't require any change in 
ControllerServer.scala as the APIs are not supported on controller and we won't 
require even KIP-1000 to be supported on controllers. I am making the change to 
initialize ClientMetricsManager only from KafkaServer.scala
   
   What do you think about `ClientMetricsReceiverPlugin` based on the usage in 
the tagged PR above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-17 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1397715022


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -104,20 +103,16 @@ public void init(final ProcessorContext context) 
{
 internalProcessorContext = (InternalProcessorContext) 
context;
 
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
-droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(),
+metrics);

Review Comment:
   ditto



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   This is because previously we look at outer store and then join. But this 
change make it we join first and then look at outer store. The ts in outer 
store and other store is hard to reason. If we change the ts of 0 to be 100 and 
ts of 1 to be 50, the original test would still produce 0 first which has 
larger ts... So unless we compare the ts of join and outer at the same time 
when we output, we can guarantee the order of ts when output.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -60,24 +61,21 @@ class KStreamKStreamJoin implements 
ProcessorSupplier joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+KStreamKStreamJoin(final boolean isLeftSide, final String otherWindowName, 
final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner, final boolean outer,
+final Optional outerJoinWindowName, final 
TimeTrackerSupplier sharedTimeTrackerSupplier) {

Review Comment:
   Change this back? I think for Kafka streams, the convention is to align with 
first param's indentation



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -165,40 +155,52 @@ public void process(final Record record) {
 // problem:
 //
 // Say we have a window size of 5 seconds
-//  1. A non-joined record with time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
-// The record is not processed yet, and is added to 
the outer-join store
-//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
-// The record is not processed yet, and is added to 
the outer-join store
-//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
-// It is time to look at the expired records. T10 and 
T2 should be emitted, but
-// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+// 1. A non-joined record with time T10 is seen in the 
left-topic
+// (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to the 
outer-join store
+// 2. A non-joined record with time T2 is seen in the 
right-topic
+// (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to the 
outer-join store
+// 3. A joined record with time T11 is seen in the 
left-topic
+// (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and T2 
should be 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   We shouldn't set throttleTimeMs even for Errors.THROTTLING_QUOTA_EXCEEDED. 
throttleTimeMs can only be set when there is no error.



##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   Hmm, why is ClientMetricsManager needed in the controller? I thought only 
brokers can receive get/push telemetric requests.



##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+clientInstance = new Client

Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-17 Thread via GitHub


AndrewJSchofield commented on PR #14789:
URL: https://github.com/apache/kafka/pull/14789#issuecomment-1816929338

   Test failures reviewed and unrelated to the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [LI-HOTFIX] Enable register watch when getting children for federated topic namespaces [kafka]

2023-11-17 Thread via GitHub


kehuum closed pull request #14793: [LI-HOTFIX] Enable register watch when 
getting children for federated topic namespaces
URL: https://github.com/apache/kafka/pull/14793


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [LI-HOTFIX] Enable register watch when getting children for federated topic namespaces [kafka]

2023-11-17 Thread via GitHub


kehuum opened a new pull request, #14793:
URL: https://github.com/apache/kafka/pull/14793

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   This pr sets registerWatch to true when getting children for federated topic 
namespaces so that the corresponding znode change handler can be registered and 
triggered properly.
   
   LI_DESCRIPTION =
   To register and trigger znode change listener under 
/federatedTopics/namespace path, we need to set registerWatch to true when 
getting its children.
   
   Tested with snapshot build in kilo cluster.
   
   EXIT_CRITERIA = We can deprecate this pr when all kafka clients have been 
migrated to xinfra clients and all topic CUDs go through xmd, then we don't 
need kafka broker to understand both old and new topic acl, it will only need 
to understand the new acl.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]

2023-11-17 Thread via GitHub


gharris1727 commented on PR #14762:
URL: https://github.com/apache/kafka/pull/14762#issuecomment-1816905171

   > What was the conclusion here? Is triggering wakeup on a separate thread 
(through WorkerSinkTask::stop) before also eventually closing the consumer on a 
separate thread (through WorkerSinkTask::cancel) safe? Looks like it probably 
might be, but I was only able to take a cursory look.
   
   I don't think it is. For example, it looks like doCommitSync catches the 
WakeupException and then makes another call to the consumer, and that call 
could be ongoing when cancel()/close() is called. I think also that after 
wakeup() exits, there's no guarantee that the other thread has actually thrown 
the WakeupException and released the consumer lock. Since the plugin has access 
to the WorkerSinkTaskContext, it can also perform an infinite loop and catch 
all of the WakeupExceptions.
   
   I think that preventing ConcurrentModificationExceptions completely will 
require heavy synchronization, which gives more opportunities for the task 
thread to block the herder thread. If we assume that framework has no infinite 
loops (which may not be the case given the behavior of doCommitSync), we could 
make cancellation only happen if the task thread is in plugin code, and the 
consumer is unlikely to be in use. This could be done with an AtomicReference, 
which is the same synchronization primitive that the Consumer is using to fire 
the ConcurrentModificationExceptions.
   
   I'll look into it more, I don't think we should merge this change until we 
have a handle on the CMEs. 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]

2023-11-17 Thread via GitHub


rreddy-22 commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1397683148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -14,17 +14,891 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * The general uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * at least one of its members subscribed to a different set of topics.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * This assignment builder prioritizes the above properties in the following 
order:
+ *  Balance > Rack Matching > Stickiness.
+ */
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
 private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+/**
+ * The member metadata obtained from the assignment specification.
+ */
+private final Map members;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The list of all the topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * List of subscribed members for each topic.
+ */
+private final Map> membersPerTopic;
+
+/**
+ * The partitions that still need to be assigned.
+ */
+private final Set unassignedPartitions;
+
+/**
+ * All the partitions that have been retained from the existing assignment.
+ */
+private final Set assignedStickyPartitions;
+
+/**
+ * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+ */
+private final AssignmentManager assignmentManager;
+
+/**
+ * List of all the members sorted by their respective assignment sizes.
+ */
+private final TreeSet sortedMembersByAssignmentSize;
+
+/**
+ * Tracks the owner of each partition in the existing assignment on the 
client.
+ *
+ * Only populated when rack aware strategy is used.
+ * Contains partitions that weren't retained due to a rack mismatch.
+ */
+private final Map currentPartitionOwners;
+
+/**
+ * Tracks the owner of each partition in the target assignment.
+ */
+private final Map 
partitionOwnerInTargetAssignment;
+
+/**
+ * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+ */
+private PartitionMovements partitionMovements;
+
+/**
+ * The new assignment that will be returned.
+ */
+private final Map targetAssignment;
+
+public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.members = assignmentSpec.members();
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = assignmentSpec.members().values().stream()
+.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+.peek(topicId -> {
+int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+if (partitionCount == -1) {
+throw new PartitionAssignorException(
+"Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+);
+}
+})
+.collect(Collectors.toSet());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+   

[PR] MINOR: Log the ZK dual-write time [kafka]

2023-11-17 Thread via GitHub


mumrah opened a new pull request, #14792:
URL: https://github.com/apache/kafka/pull/14792

   We already capture the ZK dual write timings as metrics. This patch includes 
those timings in the existing log messages.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15365) Broker-side replica management changes

2023-11-17 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim reassigned KAFKA-15365:
-

Assignee: Omnia Ibrahim

> Broker-side replica management changes
> --
>
> Key: KAFKA-15365
> URL: https://issues.apache.org/jira/browse/KAFKA-15365
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Omnia Ibrahim
>Priority: Major
>
> On the broker side, process metadata changes to partition directories as the 
> broker catches up to metadata, as described in KIP-858 under "Replica 
> management".
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Do not check whether updating tasks exist in the waiting loop [kafka]

2023-11-17 Thread via GitHub


cadonna opened a new pull request, #14791:
URL: https://github.com/apache/kafka/pull/14791

   The state updater waits on a condition variable if no tasks exist that need 
to be updated. The condition variable is wrapped by a loop to account for 
spurious wake-ups. The check whether updating tasks exist is done in the 
condition of the loop. Actually, the state updater thread can change whether 
updating tasks exists, but since the state updater thread is waiting for the 
condition variable the check for the existence of updating tasks will always 
return the same value as long as the state updater thread is waiting. Thus, the 
check only need to be done once before entering the loop.
   
   This PR moves check before the loop making also the usage of mocks more 
robust since the processing becomes more deterministic.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-17 Thread via GitHub


OmniaGM opened a new pull request, #14790:
URL: https://github.com/apache/kafka/pull/14790

   - Extend the ReplicaAlterLogDirsThread to send an 
AssignReplicasToDirsRequestData RPC before promoting the future replica in JBOD 
mode with KRAFT. 
   - Extend AssignmentsManager and DirectoryEventHandler to attach a callback 
to the events so we can track the state. 
   - The state of RPCs is kept in memory in ReplicaAlterLogDirsThread, and it 
gets updated using the callback that is sent to AssignmentsManager. Once we 
promote the future replica we clear the state of the RPCs related to this 
replica in question 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-17 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1397287511


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -829,21 +902,50 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+ConsumerGroupMember updatedMember;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);

Review Comment:
   I still find this logic quite complex to follow. I wonder if we could be a 
little more explicit. I think that the complexity comes from 
`throwIfStaticMemberValidationFails` which hide quite a lot of the logic.
   
   I wonder if something as follow would be better. I am not sure... What do 
you think?
   
   ```
   ConsumerGroupMember existingMember = group.staticMember(instanceId);
   if (memberEpoch == 0) {
  // A new static member joins or the existing static member rejoins.
  if (existingMember == null) {
 // New static member.
 member = group.getOrMaybeCreateMember(memberId, true);
 updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
  } else if (existingMember.memberId().equals(memberId) {
 // Static member rejoins the group with epoch 0.
 member = existingMember;
 updatedMemberBuilder = new ConsumerGroupMember.Builder(existingMember);
  } else {
 // Static member rejoins with a different instance id so it should 
replace
 // the previous instance iff the previous instance has -2.
 if (existingMember.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
// The new member can't join.
throw Errors.UNRELEASED_INSTANCE_ID.exception(...);
 } else {
// Replace the current member.
   staticMemberReplaced = true;
   member = existingMember;
   updatedMemberBuilder = new 
ConsumerGroupMember.Builder(group.getOrMaybeCreateMember(memberId, true));
   removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
 }
   } else {
  // Check member id or throw FENCED_INSTANCE_ID
  // Check epoch with throwIfMemberEpochIsInvalid
   }
   ```
   
   Note that I just wrote this without testing it so the code is likely not 
100% correct :).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]

2023-11-17 Thread via GitHub


dajac commented on code in PR #14582:
URL: https://github.com/apache/kafka/pull/14582#discussion_r1397610751


##
tests/kafkatest/tests/core/transactions_test.py:
##
@@ -246,8 +246,9 @@ def setup_topics(self):
 @matrix(failure_mode=["hard_bounce", "clean_bounce"],
 bounce_target=["brokers", "clients"],
 check_order=[True, False],
-use_group_metadata=[True, False])
-def test_transactions(self, failure_mode, bounce_target, check_order, 
use_group_metadata, metadata_quorum=quorum.all):
+use_group_metadata=[True, False],
+use_new_coordinator=[True, False])

Review Comment:
   All the kraft tests failed in that one...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]

2023-11-17 Thread via GitHub


dajac commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1397600750


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -14,17 +14,891 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * The general uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * at least one of its members subscribed to a different set of topics.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * This assignment builder prioritizes the above properties in the following 
order:
+ *  Balance > Rack Matching > Stickiness.
+ */
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
 private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+/**
+ * The member metadata obtained from the assignment specification.
+ */
+private final Map members;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The list of all the topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * List of subscribed members for each topic.
+ */
+private final Map> membersPerTopic;
+
+/**
+ * The partitions that still need to be assigned.
+ */
+private final Set unassignedPartitions;
+
+/**
+ * All the partitions that have been retained from the existing assignment.
+ */
+private final Set assignedStickyPartitions;
+
+/**
+ * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+ */
+private final AssignmentManager assignmentManager;
+
+/**
+ * List of all the members sorted by their respective assignment sizes.
+ */
+private final TreeSet sortedMembersByAssignmentSize;
+
+/**
+ * Tracks the owner of each partition in the existing assignment on the 
client.
+ *
+ * Only populated when rack aware strategy is used.
+ * Contains partitions that weren't retained due to a rack mismatch.
+ */
+private final Map currentPartitionOwners;
+
+/**
+ * Tracks the owner of each partition in the target assignment.
+ */
+private final Map 
partitionOwnerInTargetAssignment;
+
+/**
+ * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+ */
+private PartitionMovements partitionMovements;
+
+/**
+ * The new assignment that will be returned.
+ */
+private final Map targetAssignment;
+
+public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.members = assignmentSpec.members();
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = assignmentSpec.members().values().stream()
+.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+.peek(topicId -> {
+int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+if (partitionCount == -1) {
+throw new PartitionAssignorException(
+"Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+);
+}
+})
+.collect(Collectors.toSet());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+   

Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]

2023-11-17 Thread via GitHub


dajac commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1397596576


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -14,17 +14,891 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * The general uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * at least one of its members subscribed to a different set of topics.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * This assignment builder prioritizes the above properties in the following 
order:
+ *  Balance > Rack Matching > Stickiness.
+ */
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
 private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+/**
+ * The member metadata obtained from the assignment specification.
+ */
+private final Map members;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The list of all the topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * List of subscribed members for each topic.
+ */
+private final Map> membersPerTopic;
+
+/**
+ * The partitions that still need to be assigned.
+ */
+private final Set unassignedPartitions;
+
+/**
+ * All the partitions that have been retained from the existing assignment.
+ */
+private final Set assignedStickyPartitions;
+
+/**
+ * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+ */
+private final AssignmentManager assignmentManager;
+
+/**
+ * List of all the members sorted by their respective assignment sizes.
+ */
+private final TreeSet sortedMembersByAssignmentSize;
+
+/**
+ * Tracks the owner of each partition in the existing assignment on the 
client.
+ *
+ * Only populated when rack aware strategy is used.
+ * Contains partitions that weren't retained due to a rack mismatch.
+ */
+private final Map currentPartitionOwners;
+
+/**
+ * Tracks the owner of each partition in the target assignment.
+ */
+private final Map 
partitionOwnerInTargetAssignment;
+
+/**
+ * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+ */
+private PartitionMovements partitionMovements;
+
+/**
+ * The new assignment that will be returned.
+ */
+private final Map targetAssignment;
+
+public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.members = assignmentSpec.members();
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = assignmentSpec.members().values().stream()
+.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+.peek(topicId -> {
+int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+if (partitionCount == -1) {
+throw new PartitionAssignorException(
+"Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+);
+}
+})
+.collect(Collectors.toSet());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+   

Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]

2023-11-17 Thread via GitHub


soarez commented on code in PR #14737:
URL: https://github.com/apache/kafka/pull/14737#discussion_r1397572189


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -143,21 +143,30 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
   private def getOfflineReplicas(image: MetadataImage,
  partition: PartitionRegistration,
  listenerName: ListenerName): 
util.List[Integer] = {
-// TODO: in order to really implement this correctly, we would need JBOD 
support.
-// That would require us to track which replicas were offline on a 
per-replica basis.
-// See KAFKA-13005.
 val offlineReplicas = new util.ArrayList[Integer](0)
 for (brokerId <- partition.replicas) {
   Option(image.cluster().broker(brokerId)) match {
 case None => offlineReplicas.add(brokerId)
-case Some(broker) => if (broker.fenced() || 
!broker.listeners().containsKey(listenerName.value())) {
+case Some(broker) => if (isReplicaOffline(partition, listenerName, 
broker)) {
   offlineReplicas.add(brokerId)
 }
   }
 }
 offlineReplicas
   }
 
+  private def isReplicaOffline(partition: PartitionRegistration, listenerName: 
ListenerName, broker: BrokerRegistration) = {
+broker.fenced() || !broker.listeners().containsKey(listenerName.value()) 
|| isInOfflineDir(broker, partition)
+  }
+
+  private def isInOfflineDir(broker: BrokerRegistration, partition: 
PartitionRegistration): Boolean = {

Review Comment:
   Good point. Missed that. Updated, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-17 Thread via GitHub


dajac commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1816721578

   I've also seen `Build / JDK 8 and Scala 2.12 / testAssignmentAggregation() – 
kafka.server.AssignmentsManagerTest` failing consistently in a few builds 
([example](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2392/tests)).
 @soarez Could you please double check this test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]

2023-11-17 Thread via GitHub


pprovenzano commented on code in PR #14737:
URL: https://github.com/apache/kafka/pull/14737#discussion_r1397547750


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -143,21 +143,30 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
   private def getOfflineReplicas(image: MetadataImage,
  partition: PartitionRegistration,
  listenerName: ListenerName): 
util.List[Integer] = {
-// TODO: in order to really implement this correctly, we would need JBOD 
support.
-// That would require us to track which replicas were offline on a 
per-replica basis.
-// See KAFKA-13005.
 val offlineReplicas = new util.ArrayList[Integer](0)
 for (brokerId <- partition.replicas) {
   Option(image.cluster().broker(brokerId)) match {
 case None => offlineReplicas.add(brokerId)
-case Some(broker) => if (broker.fenced() || 
!broker.listeners().containsKey(listenerName.value())) {
+case Some(broker) => if (isReplicaOffline(partition, listenerName, 
broker)) {
   offlineReplicas.add(brokerId)
 }
   }
 }
 offlineReplicas
   }
 
+  private def isReplicaOffline(partition: PartitionRegistration, listenerName: 
ListenerName, broker: BrokerRegistration) = {
+broker.fenced() || !broker.listeners().containsKey(listenerName.value()) 
|| isInOfflineDir(broker, partition)
+  }
+
+  private def isInOfflineDir(broker: BrokerRegistration, partition: 
PartitionRegistration): Boolean = {

Review Comment:
   Do we have to handle UNASSIGNED here? 
   It would return false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-17 Thread via GitHub


AndrewJSchofield commented on code in PR #14789:
URL: https://github.com/apache/kafka/pull/14789#discussion_r1397505461


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() {
 assertEquals(5, recordsToTest.get(1).offset());
 }
 
+/**
+ * KAFKA-15836:
+ * Test that max.poll.records is honoured when consuming from multiple 
topic-partitions and the
+ * fetched records are not aligned on max.poll.records boundaries.
+ *
+ * tp0 has records 1,2,3; tp1 has records 6,7,8
+ * max.poll.records is 2
+ * 
+ * poll 1 should return 1,2
+ * poll 2 should return 3,6
+ * poll 3 should return 7,8
+ * 
+ * Or similar :)

Review Comment:
   Well, both client and server do have some kind of next-in-line behaviour I 
think. Not guaranteed, but it does at least attempt to avoid starvation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15802: Validate remote segment state before fetching index [kafka]

2023-11-17 Thread via GitHub


mimaison commented on PR #14759:
URL: https://github.com/apache/kafka/pull/14759#issuecomment-1816625357

   Is the work complete? If so can we close 
[KAFKA-15802](https://issues.apache.org/jira/browse/KAFKA-15802)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-17 Thread via GitHub


kirktrue commented on PR #14789:
URL: https://github.com/apache/kafka/pull/14789#issuecomment-1816615350

   🤦‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-17 Thread via GitHub


kirktrue commented on code in PR #14789:
URL: https://github.com/apache/kafka/pull/14789#discussion_r1397472048


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() {
 assertEquals(5, recordsToTest.get(1).offset());
 }
 
+/**
+ * KAFKA-15836:
+ * Test that max.poll.records is honoured when consuming from multiple 
topic-partitions and the
+ * fetched records are not aligned on max.poll.records boundaries.
+ *
+ * tp0 has records 1,2,3; tp1 has records 6,7,8
+ * max.poll.records is 2
+ * 
+ * poll 1 should return 1,2
+ * poll 2 should return 3,6
+ * poll 3 should return 7,8
+ * 
+ * Or similar :)

Review Comment:
   I'm pretty sure that there's nothing in the current code that will attempt 
to pull _N_ records from _M_ partitions fairly. Is it possible that the number 
of records produced to topic _T1_ is >= the value of `maxRecords` at any given 
point in time, records produced to _T2_ will never be consumed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records

2023-11-17 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787243#comment-17787243
 ] 

Andrew Schofield commented on KAFKA-15836:
--

I would say that it's not applicable to 3.6. From my reading, it looks like a 
refactoring in 3.7 introduced a regression that was not caught by unit tests.

> KafkaConsumer subscribes to multiple topics does not respect max.poll.records
> -
>
> Key: KAFKA-15836
> URL: https://issues.apache.org/jira/browse/KAFKA-15836
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Philip Nee
>Assignee: Andrew Schofield
>Priority: Blocker
>  Labels: consumer
> Fix For: 3.6.1
>
>
> We discovered that when KafkaConsumer subscribes to multiple topics with 
> max.poll.record configured.  The max.poll.record is not properly respected 
> for all poll() invocation.
>  
> I was able to reproduce it with the AK example, here is how I ran my tests:
> [https://github.com/apache/kafka/pull/14772]
>  
> 1. start zookeeper and kafka server (or kraft mode should be fine too)
> 2. Run: examples/bin/java-producer-consumer-demo.sh 1000
> 3. Polled records > 400 will be printed to stdout
>  
> Here is what the program does:
> The produce produces a large number of records to multiple topics.  We 
> configure the consumer using a max.poll.record = 400, and subscribed to 
> multiple topics.  The consumer poll, and the returned records can sometimes 
> be larger than 400.
>  
> This is an issue in AK 3.6 but 3.5 was fine.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]

2023-11-17 Thread via GitHub


soarez commented on PR #14737:
URL: https://github.com/apache/kafka/pull/14737#issuecomment-1816570498

   Thanks for the review @cmccabe. I think I have incorporated all your 
suggestions. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-17 Thread via GitHub


AndrewJSchofield opened a new pull request, #14789:
URL: https://github.com/apache/kafka/pull/14789

   When returning fetched records, the code was not properly honouring 
`max.poll.records`.
   When it had fetched records for multiple topic-partitions, it was intended 
to accumulate records up to
   `max.poll.records`. Actually, whenever it accumulated records from a 
partition, it was prepared to add
   `max.poll.records` for that partition regardless of how many records it had 
already accumulated,
   rather than reducing the number of records to add based on records already 
added from other partitions.
   
   I added a test case which failed before the code change in the PR and passes 
afterwards.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-17 Thread via GitHub


lucasbru commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1397321134


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -180,6 +180,18 @@ public void maybeAutoCommit(final Map offsets
 autocommit.setInflightCommitStatus(true);
 }
 
+/**
+ * The consumer needs to send an auto commit during the shutdown
+ */
+public List maybeAutoCommitOnClose() {

Review Comment:
   Can this be package-private?
   
   Could this return an optional instead of a list?
   
   I'd drop the "onClose" because that doesn't seem to matter here. The method 
does a `maybeAutoCommit`. The fact that it's executed on close belongs to the 
calling context no?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java:
##
@@ -35,7 +35,7 @@ public interface RequestManager {
  * synchronization protection in this method's implementation.
  *
  * 
- *
+ *Close(

Review Comment:
   whoops



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -561,23 +583,27 @@ boolean hasUnsentRequests() {
 
 OffsetCommitRequestState addOffsetCommitRequest(final 
Map offsets) {

Review Comment:
   nit: put overloads next to each other in the file (here you put 
`createOffsetCommitRequest` in between the two overloads for 
`addOffsetCommitRequest`)



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -231,6 +243,17 @@ private void handleCoordinatorDisconnect(Throwable 
exception, long currentTimeMs
 }
 }
 
+@Override
+public NetworkClientDelegate.PollResult pollOnClose() {
+if (!pendingRequests.hasUnsentRequests() || 
!coordinatorRequestManager.coordinator().isPresent())
+return EMPTY;
+
+sendAutoCommit(subscriptions.allConsumed());

Review Comment:
   We seem to have two code paths here dealing with the auto commit and I 
haven't quite grasped what is the idea here. For example, we seem to be 
creating the auto-commit request twice during close:
   
   ```
   ConsumerNetworkThread.coordinatorOnClose -> 
ConsumerNetworkThread.closingTasks -> 
ConsumerNetworkThread.maybeAutoCommitOnClose -> 
CommitRequestManager.maybeAutoCommitOnClose -> 
CommitRequestManager.createOffsetCommitRequest
   ```
   
   As well as:
   
   ```
   ConsumerNetworkThread.runAtClose -> CommitRequestManager.pollOnClose -> 
CommitRequestManager.sendAutoCommit -> 
CommitRequestManager.addOffsetCommitRequest -> 
CommitRequestManager.creatOffsetCommitRequest
   ```
   
   Could you add a bit of context to the PR so that it's easier for me to 
understand what is the thinking behind the shutdown procedure?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
 void cleanup() {
 log.trace("Closing the consumer network thread");
 Timer timer = time.timer(closeTimeout);
+coordinatorOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, timer);
 closeQuietly(requestManagers, "request managers");
 closeQuietly(networkClientDelegate, "network client delegate");
 closeQuietly(applicationEventProcessor, "application event processor");
 log.debug("Closed the consumer network thread");
 }
+
+void coordinatorOnClose(final Timer timer) {
+if (!requestManagers.coordinatorRequestManager.isPresent())
+return;
+
+connectCoordinator(timer);
+
+List tasks = closingTasks();
+do {
+long currentTimeMs = timer.currentTimeMs();
+connectCoordinator(timer);
+networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+} while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+}
+
+private void connectCoordinator(final Timer timer) {
+while (!coordinatorReady()) {
+findCoordinatorSync(timer);
+}
+}
+
+private boolean coordinatorReady() {
+CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+Optional coordinator = coordinatorRequestManager.coordinator();
+return coordinator.isPresent() && 
!networkClientDelegate.isUnavailable(coordinator.get());
+}
+
+private void findCoordinatorSync(final Timer timer) {
+CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+long currentTimeMs = timer.currentTimeMs();
+NetworkClientDelegate.PollResult request = 
coordinatorRequestManager.pollOnClose();
+networkClientDelegate.addAll(request);
+CompletableFuture findCoordinatorRe

[jira] [Commented] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records

2023-11-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787217#comment-17787217
 ] 

Mickael Maison commented on KAFKA-15836:


Awesome, thanks [~schofielaj] 

> KafkaConsumer subscribes to multiple topics does not respect max.poll.records
> -
>
> Key: KAFKA-15836
> URL: https://issues.apache.org/jira/browse/KAFKA-15836
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Philip Nee
>Assignee: Andrew Schofield
>Priority: Blocker
>  Labels: consumer
> Fix For: 3.6.1
>
>
> We discovered that when KafkaConsumer subscribes to multiple topics with 
> max.poll.record configured.  The max.poll.record is not properly respected 
> for all poll() invocation.
>  
> I was able to reproduce it with the AK example, here is how I ran my tests:
> [https://github.com/apache/kafka/pull/14772]
>  
> 1. start zookeeper and kafka server (or kraft mode should be fine too)
> 2. Run: examples/bin/java-producer-consumer-demo.sh 1000
> 3. Polled records > 400 will be printed to stdout
>  
> Here is what the program does:
> The produce produces a large number of records to multiple topics.  We 
> configure the consumer using a max.poll.record = 400, and subscribed to 
> multiple topics.  The consumer poll, and the returned records can sometimes 
> be larger than 400.
>  
> This is an issue in AK 3.6 but 3.5 was fine.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions

2023-11-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-15793.

Resolution: Fixed

> Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions
> ---
>
> Key: KAFKA-15793
> URL: https://issues.apache.org/jira/browse/KAFKA-15793
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Assignee: David Arthur
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.7.0, 3.6.1
>
> Attachments: Screenshot 2023-11-06 at 11.30.06.png
>
>
> The tests have been flaky since they were introduced in 
> [https://github.com/apache/kafka/pull/14545] (see picture attached).
> The stack traces for the flakiness can be found at 
> [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.zk.ZkMigrationIntegrationTest]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions

2023-11-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787216#comment-17787216
 ] 

Mickael Maison commented on KAFKA-15793:


Since KAFKA-15799 is now done, marking this as resolved too

> Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions
> ---
>
> Key: KAFKA-15793
> URL: https://issues.apache.org/jira/browse/KAFKA-15793
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Assignee: David Arthur
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.7.0, 3.6.1
>
> Attachments: Screenshot 2023-11-06 at 11.30.06.png
>
>
> The tests have been flaky since they were introduced in 
> [https://github.com/apache/kafka/pull/14545] (see picture attached).
> The stack traces for the flakiness can be found at 
> [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.zk.ZkMigrationIntegrationTest]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15201: Allow git push to fail gracefully [kafka]

2023-11-17 Thread via GitHub


Owen-CH-Leung commented on PR #14645:
URL: https://github.com/apache/kafka/pull/14645#issuecomment-1816422275

   @divijvaidya shall we merge this PR ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15799 Handle full metadata updates on ZK brokers [kafka]

2023-11-17 Thread via GitHub


mumrah commented on PR #14719:
URL: https://github.com/apache/kafka/pull/14719#issuecomment-1816420476

   @mimaison yup, I closed it just now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15038: Add metadatacache into RemoteLogManager, and refactor all relevant codes [kafka]

2023-11-17 Thread via GitHub


Owen-CH-Leung commented on PR #14136:
URL: https://github.com/apache/kafka/pull/14136#issuecomment-1816419916

   @kamalcph 
   
   Thanks. Just rebased. 
   
   Btw when I rebased, I notice that there's a failing test 
`kafka.server.AssignmentsManagerTest.testAssignmentAggregation` at trunk. Maybe 
we need to fix that also


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15799) ZK brokers incorrectly handle KRaft metadata snapshots

2023-11-17 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15799:
-
Fix Version/s: 3.7.0

> ZK brokers incorrectly handle KRaft metadata snapshots
> --
>
> Key: KAFKA-15799
> URL: https://issues.apache.org/jira/browse/KAFKA-15799
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> While working on the fix for KAFKA-15605, I noticed that ZK brokers are 
> unconditionally merging data from UpdateMetadataRequest with their existing 
> MetadataCache. This is not the correct behavior when handling a metadata 
> snapshot from the KRaft controller. 
> For example, if a topic was deleted in KRaft and not transmitted as part of a 
> delta update (e.g., during a failover) then the ZK brokers will never remove 
> the topic from their cache (until they restart and rebuild their cache).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14507) Add ConsumerGroupPrepareAssignment API

2023-11-17 Thread Zihao Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787193#comment-17787193
 ] 

Zihao Lin commented on KAFKA-14507:
---

Hi team, If this ticket is open I will pick it up and try to add it. 

> Add ConsumerGroupPrepareAssignment API
> --
>
> Key: KAFKA-14507
> URL: https://issues.apache.org/jira/browse/KAFKA-14507
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Zihao Lin
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15849) Fix ListGroups API when runtime partition size is zero

2023-11-17 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15849.
-
Fix Version/s: 3.7.0
 Reviewer: David Jacot
   Resolution: Fixed

> Fix ListGroups API when runtime partition size is zero
> --
>
> Key: KAFKA-15849
> URL: https://issues.apache.org/jira/browse/KAFKA-15849
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15849: Fix ListGroups API when runtime partition size is zero [kafka]

2023-11-17 Thread via GitHub


dajac merged PR #14785:
URL: https://github.com/apache/kafka/pull/14785


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub


cadonna commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1397156239


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -185,6 +188,16 @@ void awaitNotEmpty(Timer timer) {
 }
 }
 
+void wakeup() {

Review Comment:
   I used the same format as on the `Consumer` interface. Same for my recent 
change to `wokenup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub


cadonna commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1397154783


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable {
 private final Condition notEmptyCondition;
 private final IdempotentCloser idempotentCloser = new IdempotentCloser();
 
+private final AtomicBoolean wakedUp = new AtomicBoolean(false);

Review Comment:
   Oh, boy! How embarrassing!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito [kafka]

2023-11-17 Thread via GitHub


cadonna commented on code in PR #13932:
URL: https://github.com/apache/kafka/pull/13932#discussion_r1396975782


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -125,11 +122,11 @@ public class ProcessorStateManagerTest {
 private OffsetCheckpoint checkpoint;
 private StateDirectory stateDirectory;
 
-@Mock(type = MockType.NICE)
+@Mock
 private StateStore store;

Review Comment:
   My IDE tells me that this field is never used.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -361,41 +339,30 @@ public void shouldRecycleStoreAndReregisterChangelog() {
 
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
 assertThat(stateMgr.getStore(persistentStoreName), equalTo(store));
 
-reset(context, store);
-context.uninitialize();
-expect(store.name()).andStubReturn(persistentStoreName);
-replay(context, store);
+when(store.name()).thenReturn(persistentStoreName);
 
 stateMgr.registerStateStores(singletonList(store), context);
 
-verify(context, store);
+verify(context, times(2)).uninitialize();
 
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 }
 
 @Test
 public void shouldClearStoreCache() {
 final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-reset(storeMetadata);
-final CachingStore store = EasyMock.createMock(CachingStore.class);
-
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-expect(storeMetadata.store()).andStubReturn(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-store.clearCache();

Review Comment:
   I am missing this verification with Mockito and I assume this is the most 
important one given the name of the test.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -1430,16 +1385,11 @@ public void 
shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE
 
 @Test
 public void shouldShutdownTaskManagerOnClose() {
-final Consumer consumer = 
EasyMock.createNiceMock(Consumer.class);
 final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-EasyMock.replay(consumerGroupMetadata);
-final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
-taskManager.shutdown(true);
-EasyMock.expectLastCall();
-EasyMock.replay(taskManager, consumer);
+when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+final TaskManager taskManager = mock(TaskManager.class);
+doNothing().when(taskManager).shutdown(true);

Review Comment:
   Please change this to verification instead of a stub.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -766,14 +750,10 @@ public void 
shouldNotEnforceRebalanceWhenCurrentlyRebalancing() throws Interrupt
 mockTime
 );
 
-final Consumer mockConsumer = 
EasyMock.createNiceMock(Consumer.class);
-
expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty());

Review Comment:
   You cannot remove this stub without replacement. In this way a poll on the 
consumer will return `null`. Some tests are failing because of that. You 
haven't seen failing them because of the removal of `@Parametrized`.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -2701,32 +2622,20 @@ public void 
shouldThrowTaskMigratedExceptionHandlingRevocation() {
 @SuppressWarnings("unchecked")
 public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
 final StreamsConfig config = new StreamsConfig(configProps(false));
-final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+final TaskManager taskManager = mock(TaskManager.class);
 final Consumer consumer = mock(Consumer.class);
 final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-consumer.subscribe((Collection) anyObject(), anyObject());
-EasyMock.expectLastCall().anyTimes();
-consumer.unsubscribe();
-EasyMock.expectLastCall().anyTimes();
- 

Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]

2023-11-17 Thread via GitHub


lucasbru commented on code in PR #14779:
URL: https://github.com/apache/kafka/pull/14779#discussion_r1397139312


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1102,46 +1207,94 @@ public void subscribe(Pattern pattern, 
ConsumerRebalanceListener listener) {
 subscribeInternal(pattern, Optional.of(listener));
 }
 
+/**
+ * Acquire the light lock and ensure that the consumer hasn't been closed.
+ *
+ * @throws IllegalStateException If the consumer has been closed
+ */
+private void acquireAndEnsureOpen() {
+acquire();
+if (this.closed) {
+release();
+throw new IllegalStateException("This consumer has already been 
closed.");
+}
+}
+
+/**
+ * Acquire the light lock protecting this consumer from multithreaded 
access. Instead of blocking
+ * when the lock is not available, however, we just throw an exception 
(since multithreaded usage is not
+ * supported).
+ *
+ * @throws ConcurrentModificationException if another thread already has 
the lock
+ */
+private void acquire() {
+final Thread thread = Thread.currentThread();
+final long threadId = thread.getId();
+if (threadId != currentThread.get() && 
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
+throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access. " +
+"currentThread(name: " + thread.getName() + ", id: " + 
threadId + ")" +
+" otherThread(id: " + currentThread.get() + ")"
+);
+refCount.incrementAndGet();
+}
+
+/**
+ * Release the light lock protecting the consumer from multithreaded 
access.
+ */
+private void release() {
+if (refCount.decrementAndGet() == 0)
+currentThread.set(NO_CURRENT_THREAD);
+}
+
 private void subscribeInternal(Pattern pattern, 
Optional listener) {
-maybeThrowInvalidGroupIdException();
-if (pattern == null || pattern.toString().isEmpty())
-throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+acquireAndEnsureOpen();

Review Comment:
   I took a tiny bit of artistic freedom here to make the lock handling more 
consistent. I think this is still at the level where we can diverge from the 
original consumer, but if you feel strongly about it, we can also check for the 
lock after checking the `groupId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15815) JsonRestServer leaks sockets via HttpURLConnection when keep-alive enabled

2023-11-17 Thread Ashwin Pankaj (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Pankaj reassigned KAFKA-15815:
-

Assignee: Ashwin Pankaj

> JsonRestServer leaks sockets via HttpURLConnection when keep-alive enabled
> --
>
> Key: KAFKA-15815
> URL: https://issues.apache.org/jira/browse/KAFKA-15815
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Ashwin Pankaj
>Priority: Minor
>
> By default HttpURLConnection has keep-alive enabled, which allows a single 
> HttpURLConnection to be left open in order to be re-used for later requests. 
> This means that despite JsonRestServer calling `close()` on the relevant 
> InputStream, and calling `disconnect()` on the connection itself, the 
> HttpURLConnection does not call `close()` on the underlying socket.
> This affects the Trogdor AgentTest and CoordinatorTest suites, where most of 
> the methods make HTTP requests using the JsonRestServer. The effect is that 
> ~32 sockets are leaked per test run, all remaining in the CLOSE_WAIT state 
> (half closed) after the test. This is because the JettyServer has correctly 
> closed the connections, but the HttpURLConnection has not.
> There does not appear to be a way to locally override the HttpURLConnection's 
> behavior in this case, and only disabling keep-alive overall (via the system 
> property `http.keepAlive=false`) seems to resolve the socket leaks.
> To prevent the leaks, we can move JsonRestServer to an alternative HTTP 
> implementation, perhaps the jetty-client that Connect uses, or disable 
> keepAlive during tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]

2023-11-17 Thread via GitHub


lucasbru commented on code in PR #14779:
URL: https://github.com/apache/kafka/pull/14779#discussion_r1397125331


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -112,6 +112,12 @@ public void testInvalidGroupId() {
 assertThrows(InvalidGroupIdException.class, () -> 
consumer.committed(new HashSet<>()));
 }
 
+@Test
+public void testFailOnClosedConsumer() {
+consumer.close();
+assertThrows(IllegalStateException.class, consumer::assignment);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]

2023-11-17 Thread via GitHub


lucasbru commented on code in PR #14779:
URL: https://github.com/apache/kafka/pull/14779#discussion_r1397125030


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1068,12 +1168,17 @@ public void onComplete(Map offsets, Exception
 
 @Override
 public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
-backgroundEventProcessor.process();
+acquireAndEnsureOpen();

Review Comment:
   Yeah, you are right. While double-locking is fine, removed the lock. The 
public modifier here is just an artifact of interfaces not having 
package-private members.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15851) broker under replicated due to error java.nio.BufferOverflowException

2023-11-17 Thread wangliucheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangliucheng updated KAFKA-15851:
-
Attachment: (was: server.log)

> broker under replicated due to error java.nio.BufferOverflowException
> -
>
> Key: KAFKA-15851
> URL: https://issues.apache.org/jira/browse/KAFKA-15851
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.2
> Environment: Kafka Version: 3.3.2
> Deployment mode: zookeeper
>Reporter: wangliucheng
>Priority: Major
> Attachments: p1.png
>
>
> In my kafka cluster, kafka update 2.0 to 3.3.2 version 
> {*}first start failed{*}, because the same directory was configured
> The error is as follows:
>  
> {code:java}
> [2023-11-16 10:04:09,952] ERROR (main kafka.Kafka$ 159) Exiting Kafka due to 
> fatal exception during startup.
> java.lang.IllegalStateException: Duplicate log directories for 
> skydas_sc_tdevirsec-12 are found in both 
> /data01/kafka/log/skydas_sc_tdevirsec-12 and 
> /data07/kafka/log/skydas_sc_tdevirsec-12. It is likely because log directory 
> failure happened while broker was replacing current replica with future 
> replica. Recover broker from this failure by manually deleting one of the two 
> directories for this partition. It is recommended to delete the partition in 
> the log directory that is known to have failed recently.
>         at kafka.log.LogManager.loadLog(LogManager.scala:305)
>         at kafka.log.LogManager.$anonfun$loadLogs$14(LogManager.scala:403)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2023-11-16 10:04:09,953] INFO (kafka-shutdown-hook kafka.server.KafkaServer 
> 66) [KafkaServer id=1434] shutting down {code}
>  
>  
> *second,* remove /data07/kafka/log in log.dirs  and start kafka also reported 
> an error :
>  
> {code:java}
> [2023-11-16 10:13:10,713] INFO (ReplicaFetcherThread-3-1008 
> kafka.log.UnifiedLog 66) [UnifiedLog partition=ty_udp_full-60, 
> dir=/data04/kafka/log] Rolling new log segment (log_size = 
> 755780551/1073741824}, offset_index_size = 2621440/2621440, time_index_size = 
> 1747626/1747626, inactive_time_ms = 2970196/60480).
> [2023-11-16 10:13:10,714] ERROR (ReplicaFetcherThread-3-1008 
> kafka.server.ReplicaFetcherThread 76) [ReplicaFetcher replicaId=1434, 
> leaderId=1008, fetcherId=3] Unexpected error occurred while processing data 
> for partition ty_udp_full-60 at offset 2693467479
> java.nio.BufferOverflowException
>         at java.nio.Buffer.nextPutIndex(Buffer.java:555)
>         at java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:794)
>         at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:135)
>         at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
>         at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:510)
>         at kafka.log.LocalLog.$anonfun$roll$9(LocalLog.scala:529)
>         at kafka.log.LocalLog.$anonfun$roll$9$adapted(LocalLog.scala:529)
>         at scala.Option.foreach(Option.scala:437)
>         at kafka.log.LocalLog.$anonfun$roll$2(LocalLog.scala:529)
>         at kafka.log.LocalLog.roll(LocalLog.scala:786)
>         at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1537)
>         at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
>         at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
>         at kafka.log.UnifiedLog.appendAsFollower(UnifiedLog.scala:778)
>         at 
> kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1121)
>         at 
> kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1128)
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:121)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:336)
>         at scala.Option.foreach(Option.scala:437)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:325)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:324)
>         at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>         at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
>         at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:3

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub


lucasbru commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1397061812


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable {
 private final Condition notEmptyCondition;
 private final IdempotentCloser idempotentCloser = new IdempotentCloser();
 
+private final AtomicBoolean wakedUp = new AtomicBoolean(false);

Review Comment:
   nit: wokenUp?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -185,6 +188,16 @@ void awaitNotEmpty(Timer timer) {
 }
 }
 
+void wakeup() {

Review Comment:
   nit: `wakeup` vs. `wakeUp` capitalization



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -166,7 +169,7 @@ void awaitNotEmpty(Timer timer) {
 try {
 lock.lock();
 
-while (isEmpty()) {
+while (isEmpty() && !wakedUp.compareAndSet(true, false)) {

Review Comment:
   Nice! I was first convinced that we have an interleaving here that misses 
the wakeup, but `wakedUp` takes care.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]

2023-11-17 Thread via GitHub


clolov opened a new pull request, #14788:
URL: https://github.com/apache/kafka/pull/14788

   With the introduction of tiered storage the ListOffsets API can now return 
the earliest local timestamp.
   
   The purpose of this pull request is to enable the Kafka tools to get this 
information in a similar way as the earliest, latest and maximum timestamps.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records

2023-11-17 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787128#comment-17787128
 ] 

Andrew Schofield commented on KAFKA-15836:
--

Yes, I would say so. I think I'll have a PR on trunk soon.

> KafkaConsumer subscribes to multiple topics does not respect max.poll.records
> -
>
> Key: KAFKA-15836
> URL: https://issues.apache.org/jira/browse/KAFKA-15836
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Philip Nee
>Assignee: Andrew Schofield
>Priority: Blocker
>  Labels: consumer
> Fix For: 3.6.1
>
>
> We discovered that when KafkaConsumer subscribes to multiple topics with 
> max.poll.record configured.  The max.poll.record is not properly respected 
> for all poll() invocation.
>  
> I was able to reproduce it with the AK example, here is how I ran my tests:
> [https://github.com/apache/kafka/pull/14772]
>  
> 1. start zookeeper and kafka server (or kraft mode should be fine too)
> 2. Run: examples/bin/java-producer-consumer-demo.sh 1000
> 3. Polled records > 400 will be printed to stdout
>  
> Here is what the program does:
> The produce produces a large number of records to multiple topics.  We 
> configure the consumer using a max.poll.record = 400, and subscribed to 
> multiple topics.  The consumer poll, and the returned records can sometimes 
> be larger than 400.
>  
> This is an issue in AK 3.6 but 3.5 was fine.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]

2023-11-17 Thread via GitHub


yashmayya commented on code in PR #14762:
URL: https://github.com/apache/kafka/pull/14762#discussion_r1397042412


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -163,6 +163,12 @@ public void initialize(TaskConfig taskConfig) {
 }
 }
 
+@Override
+public void cancel() {
+super.cancel();
+Utils.closeQuietly(consumer, "consumer");

Review Comment:
   ```suggestion
   Utils.closeQuietly(consumer, "consumer for sink task: " + id);
   ```
   nit



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -767,11 +775,7 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() 
throws Exception {
 connect.kafka().produce(TOPIC, 0, "key", "value");
 }
 
-// Configure a sink connector whose sink task blocks in its stop method
-Map connectorConfigs = new HashMap<>();
-connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
-connectorConfigs.put(TOPICS_CONFIG, TOPIC);
-connectorConfigs.put("block", "Task::stop");
+Map connectorConfigs = baseSinkConnectorConfigs();

Review Comment:
   Same as above, `baseSinkConnectorConfigs()` can be used directly in the call 
to `configureConnector`



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -408,27 +412,30 @@ public void 
testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception {
 connect.kafka().produce(TOPIC, 0, "key", "value");
 }
 
-// Configure a sink connector whose sink task blocks in its stop method
-Map connectorConfigs = new HashMap<>();
-connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
-connectorConfigs.put(TOPICS_CONFIG, TOPIC);
-connectorConfigs.put("block", "Task::stop");
+Map connectorConfigs = baseSinkConnectorConfigs();

Review Comment:
   This can be used directly now (i.e. without defining a separate local 
variable)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records

2023-11-17 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield reassigned KAFKA-15836:


Assignee: Andrew Schofield  (was: Kirk True)

> KafkaConsumer subscribes to multiple topics does not respect max.poll.records
> -
>
> Key: KAFKA-15836
> URL: https://issues.apache.org/jira/browse/KAFKA-15836
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Philip Nee
>Assignee: Andrew Schofield
>Priority: Blocker
>  Labels: consumer
> Fix For: 3.6.1
>
>
> We discovered that when KafkaConsumer subscribes to multiple topics with 
> max.poll.record configured.  The max.poll.record is not properly respected 
> for all poll() invocation.
>  
> I was able to reproduce it with the AK example, here is how I ran my tests:
> [https://github.com/apache/kafka/pull/14772]
>  
> 1. start zookeeper and kafka server (or kraft mode should be fine too)
> 2. Run: examples/bin/java-producer-consumer-demo.sh 1000
> 3. Polled records > 400 will be printed to stdout
>  
> Here is what the program does:
> The produce produces a large number of records to multiple topics.  We 
> configure the consumer using a max.poll.record = 400, and subscribed to 
> multiple topics.  The consumer poll, and the returned records can sometimes 
> be larger than 400.
>  
> This is an issue in AK 3.6 but 3.5 was fine.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-11-17 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-14877:


Assignee: (was: Kamal Chandraprakash)

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Minor
> Fix For: 3.7.0
>
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-17 Thread via GitHub


clolov commented on PR #14787:
URL: https://github.com/apache/kafka/pull/14787#issuecomment-1816112742

   I will aim to provide a review today!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-17 Thread via GitHub


kamalcph commented on code in PR #14787:
URL: https://github.com/apache/kafka/pull/14787#discussion_r1397045499


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -625,9 +626,10 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) 
throws RemoteStorageExcepti
 // of a segment with that epoch copied into remote storage. If 
it can not find an entry then it checks for the
 // previous leader epoch till it finds an entry, If there are 
no entries till the earliest leader epoch in leader
 // epoch cache then it starts copying the segments from the 
earliest epoch entry's offset.
-copiedOffsetOption = 
OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
+copiedOffsetOption = 
Optional.of(findHighestRemoteOffset(topicIdPartition, log));
 logger.info("Found the highest copiedRemoteOffset: {} for 
partition: {} after becoming leader, " +
 "leaderEpoch: {}", copiedOffsetOption, 
topicIdPartition, leaderEpoch);
+copiedOffsetOption.ifPresent(epochAndOffset ->  
log.updateHighestOffsetInRemoteStorage(epochAndOffset.offset()));

Review Comment:
   After broker restart, if there are no more segments to upload, then the 
`copiedOffset` might be stale. It's good to update the 
`highestOffsetInRemoteStorage` in the UnifiedLog once we compute it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-17 Thread via GitHub


kamalcph opened a new pull request, #14787:
URL: https://github.com/apache/kafka/pull/14787

   `findHighestRemoteOffset` does not take into account the leader-epoch end 
offset. This can cause log divergence between the local and remote log segments 
when there is unclean leader election.
   
   To handle it correctly, the logic to find the highest remote offset can be 
updated to:
   
   ```
   find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, 
highest-remote-offset-for-epoch)
   ```
   
   Discussion thread: 
https://github.com/apache/kafka/pull/14004#discussion_r1266864272
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15799 Handle full metadata updates on ZK brokers [kafka]

2023-11-17 Thread via GitHub


mimaison commented on PR #14719:
URL: https://github.com/apache/kafka/pull/14719#issuecomment-1816083065

   Can we now close https://issues.apache.org/jira/browse/KAFKA-15799?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records

2023-11-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787118#comment-17787118
 ] 

Mickael Maison commented on KAFKA-15836:


This is marked as a blocker for 3.6.1. We're hoping the make this release in 
the next couple of weeks. Do you think you can provide a fix by then?

> KafkaConsumer subscribes to multiple topics does not respect max.poll.records
> -
>
> Key: KAFKA-15836
> URL: https://issues.apache.org/jira/browse/KAFKA-15836
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer
> Fix For: 3.6.1
>
>
> We discovered that when KafkaConsumer subscribes to multiple topics with 
> max.poll.record configured.  The max.poll.record is not properly respected 
> for all poll() invocation.
>  
> I was able to reproduce it with the AK example, here is how I ran my tests:
> [https://github.com/apache/kafka/pull/14772]
>  
> 1. start zookeeper and kafka server (or kraft mode should be fine too)
> 2. Run: examples/bin/java-producer-consumer-demo.sh 1000
> 3. Polled records > 400 will be printed to stdout
>  
> Here is what the program does:
> The produce produces a large number of records to multiple topics.  We 
> configure the consumer using a max.poll.record = 400, and subscribed to 
> multiple topics.  The consumer poll, and the returned records can sometimes 
> be larger than 400.
>  
> This is an issue in AK 3.6 but 3.5 was fine.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-17 Thread via GitHub


philipnee commented on PR #14710:
URL: https://github.com/apache/kafka/pull/14710#issuecomment-1816006917

   Hi @lucasbru @cadonna - If you get a chance, could you take a look at the 
PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub


cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1816006382

   > Do you think it is worth adding a ticket about using the 
BackgroundEventQueue for the fetches? Let me know if you have a concrete idea 
about how to implement this.
   
   Let me think about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub


cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1816005251

   > I wonder if we could just use a BlockingQueue for the fetchBuffer because 
fetchBuffer.poll(time) blocks until non-empty or timeout.
   
   As far as I see from the javadocs of a `BlockingQueue` there is no way to 
wake up from `poll()` other than interrupting the thread. I think signalling on 
a condition variable is safer than interrupting a thread since its scope is 
well-defined. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]

2023-11-17 Thread via GitHub


cadonna commented on PR #14779:
URL: https://github.com/apache/kafka/pull/14779#issuecomment-1815989094

   I agree with @philipnee that testing the locking behavior would be great. 
However, I also agree with @lucasbru that testing the locking behavior is quite 
hard. Given the tight deadline to the next release, I propose to create a 
newbie ticket for testing the blocking behavior and do that in a follow-up PR.
   Said that, I think some methods might be testable without any additional 
tooling. For example, `poll()` can be blocked waiting for the fetches in the 
fetch buffer and then be unblocked once the locking is verified. I do not know 
if we can do similar for other public methods.
   In any case, I think we should make a compromise and merge this PR w/o 
specific tests and do the testing in a follow-up PR (Note: you will not 
hear/read this sentence often from me). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error in ReplicaManagerConcurrencyTest for Scala 2.12 [kafka]

2023-11-17 Thread via GitHub


dajac merged PR #14786:
URL: https://github.com/apache/kafka/pull/14786


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error in ReplicaManagerConcurrencyTest for Scala 2.12 [kafka]

2023-11-17 Thread via GitHub


dajac commented on PR #14786:
URL: https://github.com/apache/kafka/pull/14786#issuecomment-1815931886

   Thanks @lucasbru! The compilation step has succeeded for all build and I 
have also confirmed that the test passes locally with Scala 2.12 and 2.13. 
Therefore, I will go ahead and merge it to trunk to unblock all the other PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-17 Thread via GitHub


lucasbru commented on PR #14680:
URL: https://github.com/apache/kafka/pull/14680#issuecomment-1815926638

   Scala compilation in 2.12 is broken on master, which is fixed in 
https://github.com/apache/kafka/pull/14786, otherwise this is looking good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14507) Add ConsumerGroupPrepareAssignment API

2023-11-17 Thread Zihao Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zihao Lin reassigned KAFKA-14507:
-

Assignee: Zihao Lin

> Add ConsumerGroupPrepareAssignment API
> --
>
> Key: KAFKA-14507
> URL: https://issues.apache.org/jira/browse/KAFKA-14507
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Zihao Lin
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)