[jira] [Updated] (KAFKA-13144) Change the LOG level from DEBUG to WARN/ERROR for error traces when obtaining topic metadata

2021-07-29 Thread Fernando Blanch Calvete (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fernando Blanch Calvete updated KAFKA-13144:

Description: 
As a suggestion, it would be very useful that when obtaining the metadata 
information of the topic and there are errors, to add them to the log at WARN 
level (for example) instead of DEBUG. Since in productive environments we do 
not usually have the log at DEBUG level and if errors of this type occur it is 
very useful to know them at the moment.

For example, in the class 
*org.apache.kafka.clients.consumer.internals.Fetcher*, in the line 394, where 
the errors are traced in DEBUG it would be very useful for us that they are at 
least in WARN, to identify *UNKNOWN_TOPIC_OR_PARTITION* errors and to know the 
name of the topic that causes it.

 

EDITED:

On the other hand, when the client makes the call 
"*RequestFuture future = sendMetadataRequest(request);*" (line 
374 of *org.apache.kafka.clients.consumer.internals.Fetcher*) to the broker to 
get the response, if the broker is set to "*allowAutoTopicCreation=false*" and 
the topic does not exist, the response *UNKNOWN_TOPIC_TOPIC_OR_PARTITION* error 
has no topic name information, and this exception is thrown without any 
information to the client in this way:
{code:java}
Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: 
This server does not host this topic-partition.{code}
It is scenarios with many consumer/producer configurations it is very useful to 
know the name of the topic that is causing the problem.

 

  was:
As a suggestion, it would be very useful that when obtaining the metadata 
information of the topic and there are errors, to add them to the log at WARN 
level (for example) instead of DEBUG. Since in productive environments we do 
not usually have the log at DEBUG level and if errors of this type occur it is 
very useful to know them at the moment.

For example, in the class 
*org.apache.kafka.clients.consumer.internals.Fetcher*, in the line 394, where 
the errors are traced in DEBUG it would be very useful for us that they are at 
least in WARN, to identify *UNKNOWN_TOPIC_OR_PARTITION* errors and to know the 
name of the topic that causes it.

 

 


> Change the LOG level from DEBUG to WARN/ERROR for error traces when obtaining 
> topic metadata
> 
>
> Key: KAFKA-13144
> URL: https://issues.apache.org/jira/browse/KAFKA-13144
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: Fernando Blanch Calvete
>Priority: Major
>
> As a suggestion, it would be very useful that when obtaining the metadata 
> information of the topic and there are errors, to add them to the log at WARN 
> level (for example) instead of DEBUG. Since in productive environments we do 
> not usually have the log at DEBUG level and if errors of this type occur it 
> is very useful to know them at the moment.
> For example, in the class 
> *org.apache.kafka.clients.consumer.internals.Fetcher*, in the line 394, where 
> the errors are traced in DEBUG it would be very useful for us that they are 
> at least in WARN, to identify *UNKNOWN_TOPIC_OR_PARTITION* errors and to know 
> the name of the topic that causes it.
>  
> EDITED:
> On the other hand, when the client makes the call 
> "*RequestFuture future = sendMetadataRequest(request);*" 
> (line 374 of *org.apache.kafka.clients.consumer.internals.Fetcher*) to the 
> broker to get the response, if the broker is set to 
> "*allowAutoTopicCreation=false*" and the topic does not exist, the response 
> *UNKNOWN_TOPIC_TOPIC_OR_PARTITION* error has no topic name information, and 
> this exception is thrown without any information to the client in this way:
> {code:java}
> Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: 
> This server does not host this topic-partition.{code}
> It is scenarios with many consumer/producer configurations it is very useful 
> to know the name of the topic that is causing the problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-29 Thread GitBox


showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r678905114



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,79 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+// thus, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);
+
+// we should support the "zero" time difference since sliding window 
is both start and end time inclusive
+final KTable, String> table = builder
+.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(0), 
ofMillis(50)))
+.aggregate(

Review comment:
   After this PR, we can create a window size 0 aggregation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-29 Thread GitBox


showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r678903334



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -351,7 +351,8 @@ private void processEarly(final K key, final V value, final 
long inputRecordTime
 }
 
 if (combinedWindow == null) {
-final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+// created a [start, end] time interval window via 
SessionWindow
+final SessionWindow window = new SessionWindow(0, 
windows.timeDifferenceMs());

Review comment:
   add a comment here to avoid confusion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-29 Thread GitBox


showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r678908862



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,79 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+// thus, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
   Cannot create a store supplier with window size of 0 here because we use 
`TimeWindow` to represent the "windowed KTable" result. Use window size of 1 
instead. (Not sure if this should be an improvement or it is expected?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-29 Thread GitBox


showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-83904


   @mjsax @ableegoldman @lct45 , I've added tests. Please help review. Thank 
you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11143: MINOR: close TopologyTestDriver to release resources

2021-07-29 Thread GitBox


showuon opened a new pull request #11143:
URL: https://github.com/apache/kafka/pull/11143


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10897) kafka quota optimization

2021-07-29 Thread Kahn Cheny (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389732#comment-17389732
 ] 

Kahn Cheny commented on KAFKA-10897:


Is this feature scheduled? 

> kafka quota optimization
> 
>
> Key: KAFKA-10897
> URL: https://issues.apache.org/jira/browse/KAFKA-10897
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, config, consumer, core
>Affects Versions: 2.7.0
>Reporter: yangyijun
>Priority: Blocker
>
> *1.The current quota dimensions is as follows:*
> {code:java}
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/clients/
> /config/clients/{code}
> *2. Existing problems:*
>  
> {code:java}
> 2.1.The quota dimensions is not fine enough.
> 2.2.When multiple users on the same broker produce and consume a large amount 
> of data at the same time, if you want the broker to run normally, you must 
> make the sum of all user quota byte not exceed the upper throughput limit of 
> the broker.
> 2.3.Even if all the user rate does not reach the upper limit of the broker, 
> but all the user rate is concentrated on a few disks and exceeds the 
> read-write load of the disk, all the produce and consume requests will be 
> blocked.
> 2.4.Sometimes it's just one topic rate increase sharply under the user, so we 
> just need to limit the increase sharply topics.
> {code}
>  
> *3. Suggestions for improvement*
> {code:java}
> 3.1. Add the upper limit of single broker quota byte.
> 3.2. Add the upper limit of single disk quota byte on the broker.
> 3.3. Add topic quota dimensions.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] patrickstuedi commented on a change in pull request #11129: Fix for flaky test in StoreQueryIntegrationTest

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11129:
URL: https://github.com/apache/kafka/pull/11129#discussion_r678945688



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -152,11 +150,17 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 }
 return true;
 } catch (final InvalidStateStoreException exception) {
-assertThat(
-exception.getMessage(),
-containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
-);
-LOG.info("Streams wasn't running. Will try again.");
+final String message = exception.getMessage();
+final boolean exceptionNotRunning = message.startsWith("Cannot 
get state store source-table because the stream thread is PARTITIONS_ASSIGNED, 
not RUNNING");
+if (exceptionNotRunning) {
+LOG.info("Streams wasn't running. Will try again.");
+}
+final boolean exceptionRebalanced = message.startsWith("The 
state store, source-table, may have migrated to another instance");
+if (exceptionRebalanced) {
+LOG.info("Rebalancing happened. Will try again.");
+}
+final boolean expectedException = exceptionNotRunning || 
exceptionRebalanced;
+assertThat(expectedException, is(true));

Review comment:
   Great, I was looking for something like oneOf, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r678965032



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##
@@ -65,9 +66,10 @@
  * Order is not guaranteed as bytes lexicographical ordering might not 
represent key order.
  *
  * @param from The first key that could be in the range, where iteration 
ends.
+ * A null value indicates that the range ends with the first 
element in the store.
  * @param to   The last key that could be in the range, where iteration 
starts from.
+ * A null value indicates a starting position from the last 
element in the store.

Review comment:
   Yes agree, from is still the lower bound in both range and reverse 
range. I guess the formulation in the javadoc is misleading, changing it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r678972106



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(Parameterized.class)
+public class KTableEfficientRangeQueryTest {
+private enum StoreType { InMemory, RocksDB, Timed };
+private static final String TABLE_NAME = "mytable";
+private static final int DATA_SIZE = 5;
+
+private StoreType storeType;
+private boolean enableLogging;
+private boolean enableCaching;
+private boolean forward;
+
+private HashMap data;
+private String low;
+private String high;
+private String middle;
+private String innerLow;
+private String innerHigh;
+private String innerLowBetween;
+private String innerHighBetween;
+
+private Properties streamsConfig;
+
+public KTableEfficientRangeQueryTest(final StoreType storeType, final 
boolean enableLogging, final boolean enableCaching, final boolean forward) {
+this.storeType = storeType;
+this.enableLogging = enableLogging;
+this.enableCaching = enableCaching;
+this.forward = forward;
+this.data = new HashMap<>();
+
+final int m = DATA_SIZE / 2;
+for (int i = 0; i < DATA_SIZE; i++) {
+final String key = "key-" + i * 2;
+final String value = "val-" + i * 2;
+data.put(key, value);
+high = key;
+if (low == null) {
+low = key;
+}
+if (i == m) {
+middle = key;
+}
+if (i == 1) {
+innerLow = key;
+final int index = i * 2 - 1;
+innerLowBetween = "key-" + index;
+}
+if (i == DATA_SIZE - 2) {
+innerHigh = key;
+final int index = i * 2 + 1;
+innerHighBetween = "key-" + index;
+}
+}
+Assert.assertNotNull(low);
+Assert.assertNotNull(high);
+Assert.assertNotNull(middle);
+Assert.assertNotNull(innerLow);
+Assert.assertNotNull(innerHigh);
+Assert.assertNotNull(innerLowBetween);
+Assert.assertNotNull(innerHighBetween);
+}
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, 
enableCaching={2}, forward={3}")
+public static Collecti

[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r678991669



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class RangeQueryIntegrationTest {
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+private static final Properties STREAMS_CONFIG = new Properties();
+private static final String APP_ID = "range-query-integration-test";
+private static final Long COMMIT_INTERVAL = 100L;
+private static String inputStream;
+private static final String TABLE_NAME = "mytable";
+private static final int DATA_SIZE = 5;
+
+private enum StoreType { InMemory, RocksDB, Timed };
+private StoreType storeType;
+private boolean enableLogging;
+private boolean enableCaching;
+private boolean forward;
+private KafkaStreams kafkaStreams;
+
+private List> records;
+private String low;
+private String high;
+private String middle;
+private String innerLow;
+private String innerHigh;
+private String innerLowBetween;
+private String innerHighBetween;
+
+public RangeQueryIntegrationTest(final StoreType storeType, final boolean 
enableLogging, final boolean enableCaching, final boolean forward) {
+this.storeType = storeType;
+this.enableLogging = enableLogging;
+this.enableCaching = enableCaching;
+this.forward = forward;
+
+records = new LinkedList<>();
+final int m = DATA_SIZE / 2;
+for (int i = 0; i < DATA_SIZE; i++) {
+final String key = "key-" + i * 2;
+final String value = "val-" + i * 2;
+records.add(new KeyValue<>(key, value));
+high = key;
+if (low == null) {
+low = key;
+}
+if (i == m) {
+middle = key;
+}
+if (i == 1) {
+innerLow = key;
+final int index = i * 2 - 1;
+innerLowBetween = "key-" + index;
+}
+if (i == DATA_SIZE - 2) {
+ 

[GitHub] [kafka] showuon commented on a change in pull request #11129: Fix for flaky test in StoreQueryIntegrationTest

2021-07-29 Thread GitBox


showuon commented on a change in pull request #11129:
URL: https://github.com/apache/kafka/pull/11129#discussion_r679004926



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -153,10 +152,15 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 return true;
 } catch (final InvalidStateStoreException exception) {
 assertThat(
-exception.getMessage(),
-containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+exception.getMessage(),
+is(
+oneOf(
+containsString("Cannot get state store 
source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+containsString("The state store, 
source-table, may have migrated to another instance")
+)
+)

Review comment:
   nit: Could we add a error reason in this assertion, so that when the 
exception message is not one of these 2 messages, we can know what happened. 
Ex: "Unexpected exception thrown while getting the value from store."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13150) How is Kafkastream configured to consume data from a specified offset ?

2021-07-29 Thread wangjh (Jira)
wangjh created KAFKA-13150:
--

 Summary: How is Kafkastream configured to consume data from a 
specified offset ?
 Key: KAFKA-13150
 URL: https://issues.apache.org/jira/browse/KAFKA-13150
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: wangjh






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2021-07-29 Thread Raj (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389844#comment-17389844
 ] 

Raj commented on KAFKA-2729:


Hi [~junrao] ,

This was just hit in our production as well although I was able to resolve it 
by only restarting the broker that reported errors as opposed to the controller 
or the whole cluster.

Kafka version : 2.3.1

I can confirm the events are identical to what [~l0co]  explained above. 
 * ZK session disconnected on broker 5
 * Replica Fetchers stopped on other brokers
 * ZK Connection re-established on broker 5 after a few seconds
 * Broker 5 came back online and started reporting the "Cached zkVersion[130] 
not equal to..." and shrunk ISRs to only itself

As it didn't recover automatically, I restarted the broker after 30 minutes and 
it then went back to normal.

I did see that the controller tried to send correct metadata to broker 5 but 
which was rejected due to epoch inconsistency.
{noformat}
ERROR [KafkaApi-5] Error when handling request: clientId=21, correlationId=2, 
api=UPDATE_METADATA, 
body={controller_id=21,controller_epoch=53,broker_epoch=223338313060,topic_states=[{topic-a,partition_states=[{partition=0,controller_epoch=53,leader=25,leader_epoch=70,isr=[25,17],zk_version=131,replicas=[5,25,17],offline_replicas=[]}...
...
java.lang.IllegalStateException: Epoch 223338313060 larger than current broker 
epoch 223338311791
at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2612)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:194)
at kafka.server.KafkaApis.handle(KafkaApis.scala:117)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.base/java.lang.Thread.run(Thread.java:834)
...
...
...
[2021-07-29 11:07:30,210] INFO [Partition topic-a-0 broker=5] Cached zkVersion 
[130] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition)
...

{noformat}
 

Preferred leader election error as seen on controller
{noformat}
[2021-07-29 11:11:57,432] ERROR [Controller id=21] Error completing preferred 
replica leader election for partition topic-a-0 
(kafka.controller.KafkaController)
kafka.common.StateChangeFailedException: Failed to elect leader for partition 
topic-a-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy
at 
kafka.controller.ZkPartitionStateMachine$$anonfun$doElectLeaderForPartitions$3.apply(PartitionStateMachine.scala:381)
at 
kafka.controller.ZkPartitionStateMachine$$anonfun$doElectLeaderForPartitions$3.apply(PartitionStateMachine.scala:378)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
kafka.controller.ZkPartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:378)
at 
kafka.controller.ZkPartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:305)
at 
kafka.controller.ZkPartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:215)
at 
kafka.controller.ZkPartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:145)
at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$onPreferredReplicaElection(KafkaController.scala:646)
at 
kafka.controller.KafkaController$$anonfun$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:995)
at 
kafka.controller.KafkaController$$anonfun$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:976)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
kafka.controller.KafkaController.checkAndTriggerAutoLeaderRebalance(KafkaController.scala:976)
at 
kafka.controller.KafkaController.processAutoPreferredReplicaLeaderElection(KafkaController.scala:1004)
at kafka.controller.KafkaController.process(KafkaController.scala:1564)
at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53)
at 
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:137)
at 
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:137)
at 
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:137)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:136)
at 
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89){noformat}
 

After the restart of broker-5, it was able to take back leadership of the 
desired partitions

 

Kindly l

[GitHub] [kafka] gklijs opened a new pull request #11144: Call the serialize method including headers from the MockProducer

2021-07-29 Thread GitBox


gklijs opened a new pull request #11144:
URL: https://github.com/apache/kafka/pull/11144


   Currently when using serializers like the Cloud Event Serializer, we need to 
do a work around so it doesn't throw an error. Using the method taking the 
headers would prevent this. Since the default implementation just calls the 
method without the headers, it's expected to be fully backwards compatible.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11091: MINOR: Fix `testResolveDnsLookup` by using a mocked dns resolver

2021-07-29 Thread GitBox


ijuma commented on pull request #11091:
URL: https://github.com/apache/kafka/pull/11091#issuecomment-889066685


   Cherry-picked to older branches (2.2...2.8).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13146) Consider client use cases for accessing controller endpoints

2021-07-29 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389899#comment-17389899
 ] 

Tom Bentley commented on KAFKA-13146:
-

{quote}We have also considered whether the internal __cluster_metadata topic 
should be readable through the controller endpoints by consumers.{quote}

That's what I had expected when originally reading the various KIPs, but 
exposing the metadata log comes with some downsides. Would consumers be able to 
do {{read_committed}} reads?

> Consider client use cases for accessing controller endpoints
> 
>
> Key: KAFKA-13146
> URL: https://issues.apache.org/jira/browse/KAFKA-13146
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> In KAFKA-13143, we dropped the Metadata from the controller APIs. We did this 
> for two reasons. First, the implementation did not return any topic metadata. 
> This was confusing for users who mistakenly tried to use the controller 
> endpoint in order to describe or list topics since it would appear that no 
> topics existed in the cluster. The second reason is that the implementation 
> returned the controller endpoints. So even if we returned the topic metadata, 
> clients would be unable to access the topics for reading or writing through 
> the controller endpoint.
> So for 3.0, we are effectively saying that clients should only access the 
> broker endpoints. Long term, is that what we want? When running the 
> controllers as separate nodes, it may be useful to initialize the controllers 
> and cluster metadata before starting any of the brokers, for example. For 
> this to work, we need to put some thought into how the Metadata API should 
> work with controllers. For example, we can return a flag or some kind of 
> error code in the response to indicate that topic metadata is not available. 
> We have also considered whether the internal __cluster_metadata topic should 
> be readable through the controller endpoints by consumers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r679163645



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##
@@ -81,6 +82,16 @@ public void after() {
 return result;
 }
 
+private static List> getOrderedContents(final 
KeyValueIterator iter) {
+final LinkedList> result = new 
LinkedList<>();
+while (iter.hasNext()) {
+final KeyValue entry = iter.next();
+result.add(entry);
+}
+
+return result;
+}

Review comment:
   Makes sense. I changed to the code to use existing util functions 
whenever possible. If you think the changes overshoot let me know, I'm also not 
entirely sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r679163645



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##
@@ -81,6 +82,16 @@ public void after() {
 return result;
 }
 
+private static List> getOrderedContents(final 
KeyValueIterator iter) {
+final LinkedList> result = new 
LinkedList<>();
+while (iter.hasNext()) {
+final KeyValue entry = iter.next();
+result.add(entry);
+}
+
+return result;
+}

Review comment:
   Makes sense. I changed to the code to use existing util functions 
whenever possible. If you think the changes overshoot let me know, I'm also not 
entirely sure/convinced.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r679166423



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##
@@ -383,13 +394,29 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() 
{
 }
 
 @Test
-public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-assertThrows(NullPointerException.class, () -> store.range(null, 2));
+public void shouldReturnValueOnRangeNullFromKey() {

Review comment:
   Yup, good point, added. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10897) kafka quota optimization

2021-07-29 Thread yangyijun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389911#comment-17389911
 ] 

yangyijun commented on KAFKA-10897:
---

Not yet

> kafka quota optimization
> 
>
> Key: KAFKA-10897
> URL: https://issues.apache.org/jira/browse/KAFKA-10897
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, config, consumer, core
>Affects Versions: 2.7.0
>Reporter: yangyijun
>Priority: Blocker
>
> *1.The current quota dimensions is as follows:*
> {code:java}
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/clients/
> /config/clients/{code}
> *2. Existing problems:*
>  
> {code:java}
> 2.1.The quota dimensions is not fine enough.
> 2.2.When multiple users on the same broker produce and consume a large amount 
> of data at the same time, if you want the broker to run normally, you must 
> make the sum of all user quota byte not exceed the upper throughput limit of 
> the broker.
> 2.3.Even if all the user rate does not reach the upper limit of the broker, 
> but all the user rate is concentrated on a few disks and exceeds the 
> read-write load of the disk, all the produce and consume requests will be 
> blocked.
> 2.4.Sometimes it's just one topic rate increase sharply under the user, so we 
> just need to limit the increase sharply topics.
> {code}
>  
> *3. Suggestions for improvement*
> {code:java}
> 3.1. Add the upper limit of single broker quota byte.
> 3.2. Add the upper limit of single disk quota byte on the broker.
> 3.3. Add topic quota dimensions.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-10897) kafka quota optimization

2021-07-29 Thread yangyijun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yangyijun updated KAFKA-10897:
--
Comment: was deleted

(was: Not yet)

> kafka quota optimization
> 
>
> Key: KAFKA-10897
> URL: https://issues.apache.org/jira/browse/KAFKA-10897
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, config, consumer, core
>Affects Versions: 2.7.0
>Reporter: yangyijun
>Priority: Blocker
>
> *1.The current quota dimensions is as follows:*
> {code:java}
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/clients/
> /config/clients/{code}
> *2. Existing problems:*
>  
> {code:java}
> 2.1.The quota dimensions is not fine enough.
> 2.2.When multiple users on the same broker produce and consume a large amount 
> of data at the same time, if you want the broker to run normally, you must 
> make the sum of all user quota byte not exceed the upper throughput limit of 
> the broker.
> 2.3.Even if all the user rate does not reach the upper limit of the broker, 
> but all the user rate is concentrated on a few disks and exceeds the 
> read-write load of the disk, all the produce and consume requests will be 
> blocked.
> 2.4.Sometimes it's just one topic rate increase sharply under the user, so we 
> just need to limit the increase sharply topics.
> {code}
>  
> *3. Suggestions for improvement*
> {code:java}
> 3.1. Add the upper limit of single broker quota byte.
> 3.2. Add the upper limit of single disk quota byte on the broker.
> 3.3. Add topic quota dimensions.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10897) kafka quota optimization

2021-07-29 Thread yangyijun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389913#comment-17389913
 ] 

yangyijun commented on KAFKA-10897:
---

It has not started yet

> kafka quota optimization
> 
>
> Key: KAFKA-10897
> URL: https://issues.apache.org/jira/browse/KAFKA-10897
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, config, consumer, core
>Affects Versions: 2.7.0
>Reporter: yangyijun
>Priority: Blocker
>
> *1.The current quota dimensions is as follows:*
> {code:java}
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/clients/
> /config/clients/{code}
> *2. Existing problems:*
>  
> {code:java}
> 2.1.The quota dimensions is not fine enough.
> 2.2.When multiple users on the same broker produce and consume a large amount 
> of data at the same time, if you want the broker to run normally, you must 
> make the sum of all user quota byte not exceed the upper throughput limit of 
> the broker.
> 2.3.Even if all the user rate does not reach the upper limit of the broker, 
> but all the user rate is concentrated on a few disks and exceeds the 
> read-write load of the disk, all the produce and consume requests will be 
> blocked.
> 2.4.Sometimes it's just one topic rate increase sharply under the user, so we 
> just need to limit the increase sharply topics.
> {code}
>  
> *3. Suggestions for improvement*
> {code:java}
> 3.1. Add the upper limit of single broker quota byte.
> 3.2. Add the upper limit of single disk quota byte on the broker.
> 3.3. Add topic quota dimensions.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10897) kafka quota optimization

2021-07-29 Thread yangyijun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389918#comment-17389918
 ] 

yangyijun commented on KAFKA-10897:
---

Dear [~KahnCheny] , will you start this part of the work?

> kafka quota optimization
> 
>
> Key: KAFKA-10897
> URL: https://issues.apache.org/jira/browse/KAFKA-10897
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, config, consumer, core
>Affects Versions: 2.7.0
>Reporter: yangyijun
>Priority: Blocker
>
> *1.The current quota dimensions is as follows:*
> {code:java}
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/clients/
> /config/clients/{code}
> *2. Existing problems:*
>  
> {code:java}
> 2.1.The quota dimensions is not fine enough.
> 2.2.When multiple users on the same broker produce and consume a large amount 
> of data at the same time, if you want the broker to run normally, you must 
> make the sum of all user quota byte not exceed the upper throughput limit of 
> the broker.
> 2.3.Even if all the user rate does not reach the upper limit of the broker, 
> but all the user rate is concentrated on a few disks and exceeds the 
> read-write load of the disk, all the produce and consume requests will be 
> blocked.
> 2.4.Sometimes it's just one topic rate increase sharply under the user, so we 
> just need to limit the increase sharply topics.
> {code}
>  
> *3. Suggestions for improvement*
> {code:java}
> 3.1. Add the upper limit of single broker quota byte.
> 3.2. Add the upper limit of single disk quota byte on the broker.
> 3.3. Add topic quota dimensions.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r679180578



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##
@@ -383,13 +394,29 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() 
{
 }
 
 @Test
-public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-assertThrows(NullPointerException.class, () -> store.range(null, 2));
+public void shouldReturnValueOnRangeNullFromKey() {
+store.put(0, "zero");
+store.put(1, "one");
+store.put(2, "two");
+
+final LinkedList> expectedContents = new 
LinkedList<>();
+expectedContents.add(new KeyValue<>(0, "zero"));
+expectedContents.add(new KeyValue<>(1, "one"));
+
+assertEquals(expectedContents, getOrderedContents(store.range(null, 
1)));
 }
 
 @Test
-public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
-assertThrows(NullPointerException.class, () -> store.range(2, null));
+public void shouldReturnValueOnRangeNullToKey() {

Review comment:
   Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r679214947



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(Parameterized.class)
+public class KTableEfficientRangeQueryTest {
+private enum StoreType { InMemory, RocksDB, Timed };
+private static final String TABLE_NAME = "mytable";
+private static final int DATA_SIZE = 5;
+
+private StoreType storeType;
+private boolean enableLogging;
+private boolean enableCaching;
+private boolean forward;
+
+private HashMap data;
+private String low;
+private String high;
+private String middle;
+private String innerLow;
+private String innerHigh;
+private String innerLowBetween;
+private String innerHighBetween;
+
+private Properties streamsConfig;
+
+public KTableEfficientRangeQueryTest(final StoreType storeType, final 
boolean enableLogging, final boolean enableCaching, final boolean forward) {
+this.storeType = storeType;
+this.enableLogging = enableLogging;
+this.enableCaching = enableCaching;
+this.forward = forward;
+this.data = new HashMap<>();
+
+final int m = DATA_SIZE / 2;
+for (int i = 0; i < DATA_SIZE; i++) {
+final String key = "key-" + i * 2;
+final String value = "val-" + i * 2;
+data.put(key, value);
+high = key;
+if (low == null) {
+low = key;
+}
+if (i == m) {
+middle = key;
+}
+if (i == 1) {
+innerLow = key;
+final int index = i * 2 - 1;
+innerLowBetween = "key-" + index;
+}
+if (i == DATA_SIZE - 2) {
+innerHigh = key;
+final int index = i * 2 + 1;
+innerHighBetween = "key-" + index;
+}
+}
+Assert.assertNotNull(low);
+Assert.assertNotNull(high);
+Assert.assertNotNull(middle);
+Assert.assertNotNull(innerLow);
+Assert.assertNotNull(innerHigh);
+Assert.assertNotNull(innerLowBetween);
+Assert.assertNotNull(innerHighBetween);
+}
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, 
enableCaching={2}, forward={3}")
+public static Collecti

[GitHub] [kafka] patrickstuedi commented on a change in pull request #11129: Fix for flaky test in StoreQueryIntegrationTest

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11129:
URL: https://github.com/apache/kafka/pull/11129#discussion_r679220475



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -153,10 +152,15 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 return true;
 } catch (final InvalidStateStoreException exception) {
 assertThat(
-exception.getMessage(),
-containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+exception.getMessage(),
+is(
+oneOf(
+containsString("Cannot get state store 
source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+containsString("The state store, 
source-table, may have migrated to another instance")
+)
+)

Review comment:
   yup, makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10978: MINOR: Use time constant algorithms when comparing passwords or keys

2021-07-29 Thread GitBox


rhauch commented on pull request #10978:
URL: https://github.com/apache/kafka/pull/10978#issuecomment-889222564


   @mimaison, @omkreddy, @rajinisivaram, @ijuma: would some of you be able to 
review these changes? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


vvcephei commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r679231292



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
##
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(Parameterized.class)
+public class KTableEfficientRangeQueryTest {
+private enum StoreType { InMemory, RocksDB, Timed };
+private static final String TABLE_NAME = "mytable";
+private static final int DATA_SIZE = 5;
+
+private StoreType storeType;
+private boolean enableLogging;
+private boolean enableCaching;
+private boolean forward;
+
+private LinkedList> records;
+private String low;
+private String high;
+private String middle;
+private String innerLow;
+private String innerHigh;
+private String innerLowBetween;
+private String innerHighBetween;
+
+private Properties streamsConfig;
+
+public KTableEfficientRangeQueryTest(final StoreType storeType, final 
boolean enableLogging, final boolean enableCaching, final boolean forward) {
+this.storeType = storeType;
+this.enableLogging = enableLogging;
+this.enableCaching = enableCaching;
+this.forward = forward;
+
+this.records = new LinkedList<>();
+final int m = DATA_SIZE / 2;
+for (int i = 0; i < DATA_SIZE; i++) {
+final String key = "key-" + i * 2;
+final String value = "val-" + i * 2;
+records.add(new KeyValue<>(key, value));
+high = key;
+if (low == null) {
+low = key;
+}
+if (i == m) {
+middle = key;
+}
+if (i == 1) {
+innerLow = key;
+final int index = i * 2 - 1;
+innerLowBetween = "key-" + index;
+}
+if (i == DATA_SIZE - 2) {
+innerHigh = key;
+final int index = i * 2 + 1;
+innerHighBetween = "key-" + index;
+}
+}
+Assert.assertNotNull(low);
+Assert.assertNotNull(high);
+Assert.assertNotNull(middle);
+Assert.assertNotNull(innerLow);
+Assert.assertNotNull(innerHigh);
+Assert.assertNotNull(innerLowBetween);
+Assert.assertNotNull(innerHighBetween);
+}
+
+@Rule
+public TestName testName = new TestName()

[GitHub] [kafka] vvcephei commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


vvcephei commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r679242346



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(Parameterized.class)
+public class KTableEfficientRangeQueryTest {
+private enum StoreType { InMemory, RocksDB, Timed };
+private static final String TABLE_NAME = "mytable";
+private static final int DATA_SIZE = 5;
+
+private StoreType storeType;
+private boolean enableLogging;
+private boolean enableCaching;
+private boolean forward;
+
+private HashMap data;
+private String low;
+private String high;
+private String middle;
+private String innerLow;
+private String innerHigh;
+private String innerLowBetween;
+private String innerHighBetween;
+
+private Properties streamsConfig;
+
+public KTableEfficientRangeQueryTest(final StoreType storeType, final 
boolean enableLogging, final boolean enableCaching, final boolean forward) {
+this.storeType = storeType;
+this.enableLogging = enableLogging;
+this.enableCaching = enableCaching;
+this.forward = forward;
+this.data = new HashMap<>();
+
+final int m = DATA_SIZE / 2;
+for (int i = 0; i < DATA_SIZE; i++) {
+final String key = "key-" + i * 2;
+final String value = "val-" + i * 2;
+data.put(key, value);
+high = key;
+if (low == null) {
+low = key;
+}
+if (i == m) {
+middle = key;
+}
+if (i == 1) {
+innerLow = key;
+final int index = i * 2 - 1;
+innerLowBetween = "key-" + index;
+}
+if (i == DATA_SIZE - 2) {
+innerHigh = key;
+final int index = i * 2 + 1;
+innerHighBetween = "key-" + index;
+}
+}
+Assert.assertNotNull(low);
+Assert.assertNotNull(high);
+Assert.assertNotNull(middle);
+Assert.assertNotNull(innerLow);
+Assert.assertNotNull(innerHigh);
+Assert.assertNotNull(innerLowBetween);
+Assert.assertNotNull(innerHighBetween);
+}
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, 
enableCaching={2}, forward={3}")
+public static Collection da

[jira] [Commented] (KAFKA-2376) Add Kafka Connect metrics

2021-07-29 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389959#comment-17389959
 ] 

Randall Hauch commented on KAFKA-2376:
--

[~ramkrish1489], it's not uncommon for a KIP to propose metrics that are 
implemented in multiple releases. In this case, I think other things took 
priority.

We'd welcome any contributions, though in this case it'd be through a new KAFKA 
issue that references this issue and/or the KIP.

> Add Kafka Connect metrics
> -
>
> Key: KAFKA-2376
> URL: https://issues.apache.org/jira/browse/KAFKA-2376
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Randall Hauch
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 1.0.0
>
>
> Kafka Connect needs good metrics for monitoring since that will be the 
> primary insight into the health of connectors as they copy data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wycccccc commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…

2021-07-29 Thread GitBox


wycc commented on pull request #10881:
URL: https://github.com/apache/kafka/pull/10881#issuecomment-889261189


   @ableegoldman Have resloved conflict, thanks for your review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11135: KAFKA-13143 Remove HandleMetadata from ControllerAPis as metadata is not completely implemented on KRaft controllers

2021-07-29 Thread GitBox


hachikuji merged pull request #11135:
URL: https://github.com/apache/kafka/pull/11135


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13143) Disable Metadata endpoint for KRaft controller

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13143.
-
Resolution: Fixed

> Disable Metadata endpoint for KRaft controller
> --
>
> Key: KAFKA-13143
> URL: https://issues.apache.org/jira/browse/KAFKA-13143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The controller currently implements Metadata incompletely. Specifically, it 
> does not return the metadata for any topics in the cluster. This may tend to 
> cause confusion to users. For example, if someone used the controller 
> endpoint by mistake in `kafka-topics.sh --list`, then they would see no 
> topics in the cluster, which would be surprising. It would be better for 3.0 
> to disable Metadata on the controller since we currently expect clients to 
> connect through brokers anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr commented on pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

2021-07-29 Thread GitBox


dielhennr commented on pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#issuecomment-889296194


   I stacked https://github.com/cmccabe/kafka/pull/6 on this PR to address some 
of Jun's comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11140: MINOR: log epoch and offset truncation similarly to HWM truncation

2021-07-29 Thread GitBox


hachikuji commented on a change in pull request #11140:
URL: https://github.com/apache/kafka/pull/11140#discussion_r679314640



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -247,7 +247,7 @@ abstract class AbstractFetcherThread(name: String,
 val highWatermark = partitionState.fetchOffset
 val truncationState = OffsetTruncationState(highWatermark, 
truncationCompleted = true)
 
-info(s"Truncating partition $tp to local high watermark 
$highWatermark")
+info(s"Truncating partition $tp to $truncationState due to local high 
watermark $highWatermark")

Review comment:
   nit: the `toString` for `OffsetTruncationState` is a little 
unconventional. Maybe we could replace it with something like this:
   ```scala
 override def toString: String = s"TruncationState(offset=$offset, 
completed:$truncationCompleted)"
   ```
   Then we can change this from "to" to "with" so the message reads (e.g.):
   > Truncating partition foo-0 with TruncationState(offset=19, completed=true) 
based on local high watermark 19.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10909: KAFKA-12158: Better return type of RaftClient.scheduleAppend

2021-07-29 Thread GitBox


jsancio commented on a change in pull request #10909:
URL: https://github.com/apache/kafka/pull/10909#discussion_r679326328



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2248,24 +2248,23 @@ public void poll() {
 }
 
 @Override
-public Long scheduleAppend(int epoch, List records) {
+public long scheduleAppend(int epoch, List records) {
 return append(epoch, records, false);
 }
 
 @Override
-public Long scheduleAtomicAppend(int epoch, List records) {
+public long scheduleAtomicAppend(int epoch, List records) {
 return append(epoch, records, true);
 }
 
-private Long append(int epoch, List records, boolean isAtomic) {
-Optional> leaderStateOpt = quorum.maybeLeaderState();
-if (!leaderStateOpt.isPresent()) {
-return Long.MAX_VALUE;
-}
+private long append(int epoch, List records, boolean isAtomic) {
+LeaderState leaderState = quorum.maybeLeaderState().orElseThrow(
+() -> new IllegalStateException("Can not get offset because we are 
not the current leader")

Review comment:
   I think that having an offset is a side effect of appending to the 
accumulator. How about:
   "Cannot append records because the replica is not the leader".

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -147,8 +147,9 @@ default void beginShutdown() {}
  * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if 
the size of the records is greater than the maximum

Review comment:
   This comment also applies to the `scheduleAtomicAppend` documentation.
   
   This is for the @return tag but we need to update the documentation about 
returning null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-29 Thread GitBox


vincent81jiang commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679342991



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -171,38 +179,40 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable retainedRecords = new ArrayList<>();
 
-try (final CloseableIterator iterator = 
batch.streamingIterator(decompressionBufferSupplier)) {
-while (iterator.hasNext()) {
-Record record = iterator.next();
-filterResult.messagesRead += 1;
-
-if (filter.shouldRetainRecord(batch, record)) {
-// Check for log corruption due to KAFKA-4298. If we 
find it, make sure that we overwrite
-// the corrupted batch with correct data.
-if (!record.hasMagic(batchMagic))
-writeOriginalBatch = false;
-
-if (record.offset() > maxOffset)
-maxOffset = record.offset();
-
-retainedRecords.add(record);
-} else {
-writeOriginalBatch = false;
-}
-}
-}
+final BatchFilterResult iterationResult = filterBatch(batch, 
decompressionBufferSupplier, filterResult, filter,
+  
batchMagic, true, maxOffset, retainedRecords);
+boolean containsTombstones = iterationResult.containsTombstones();
+boolean writeOriginalBatch = 
iterationResult.shouldWriteOriginalBatch();
+maxOffset = iterationResult.maxOffset();
 
 if (!retainedRecords.isEmpty()) {
-if (writeOriginalBatch) {
+// we check if the delete horizon should be set to a new value
+// in which case, we need to reset the base timestamp and 
overwrite the timestamp deltas
+// if the batch does not contain tombstones, then we don't 
need to overwrite batch
+boolean needToSetDeleteHorizon = batch.magic() >= 2 && 
(containsTombstones || containsMarkerForEmptyTxn)

Review comment:
   RecordBatch.MAGIC_VALUE_V2?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-29 Thread GitBox


vincent81jiang commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679343797



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding 
topic
* @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
*/
   private[log] def cleanInto(topicPartition: TopicPartition,
  sourceRecords: FileRecords,
  dest: LogSegment,
  map: OffsetMap,
- retainDeletesAndTxnMarkers: Boolean,
+ retainLegacyDeletesAndTxnMarkers: Boolean,
+ deleteRetentionMs: Long,
  maxLogMessageSize: Int,
  transactionMetadata: CleanedTransactionMetadata,
  lastRecordsOfActiveProducers: Map[Long, 
LastRecord],
- stats: CleanerStats): Unit = {
-val logCleanerFilter: RecordFilter = new RecordFilter {
+ stats: CleanerStats,
+ currentTime: Long): Long = {
+var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP
+
+val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, 
deleteRetentionMs) {
   var discardBatchRecords: Boolean = _
 
-  override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+  override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult = {
 // we piggy-back on the tombstone retention logic to delay deletion of 
transaction markers.
 // note that we will never delete a marker until all the records from 
that transaction are removed.
-discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, 
retainTxnMarkers = retainDeletesAndTxnMarkers)
+val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+if (batch.isControlBatch) {
+  if (batch.magic() < 2) {

Review comment:
   RecordBatch.MAGIC_VALUE_V2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-29 Thread GitBox


vincent81jiang commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679346836



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -239,9 +249,68 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable

[jira] [Updated] (KAFKA-13095) TransactionsTest is failing in kraft mode

2021-07-29 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-13095:
-
Description: TransactionsTest#testSendOffsetsToTransactionTimeout keeps 
flaking on Jenkins.

> TransactionsTest is failing in kraft mode
> -
>
> Key: KAFKA-13095
> URL: https://issues.apache.org/jira/browse/KAFKA-13095
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>
> TransactionsTest#testSendOffsetsToTransactionTimeout keeps flaking on Jenkins.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679367945



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))

Review comment:
   Not sure how to handle the `alterConfigPolicy` in KRaft. In the Zk case 
this is kept in `ZkAdminManager` which closes `alterConfigPolicy` when 
`KafkaServer` calls `adminManager.shutdown()`. Since KRaft does not have this 
`adminManager` I'm not sure where the `alterConfigPolicy` should be closed. 
@cmccabe Any thoughts about this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679367945



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))

Review comment:
   Not sure how to handle the `alterConfigPolicy` in KRaft. In the Zk case 
this is kept in `ZkAdminManager` which closes `alterConfigPolicy` when 
`KafkaServer` calls `adminManager.shutdown()`. Since KRaft does not have this 
`adminManager` I'm not sure where the `alterConfigPolicy` should be closed. 
@cmccabe @hachikuji  Any thoughts about this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679380487



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))
+
+  def validateConfigPolicy(resource: ConfigResource, configEntriesMap: 
Map[String, String]): Unit = {
+this.alterConfigPolicy match {
+  case Some(policy) =>
+policy.validate(new AlterConfigPolicy.RequestMetadata(
+  new ConfigResource(resource.`type`(), resource.name), 
configEntriesMap.asJava))
+  case None =>
+}
+  }
+
+  def getAndValidateBrokerId(resource: ConfigResource) = {
+if (resource.name == null || resource.name.isEmpty)
+  None
+else {
+  val id = resourceNameToBrokerId(resource.name)
+  if (id != this.config.brokerId)
+throw new InvalidRequestException(s"Unexpected broker id, expected 
${this.config.brokerId}, but received ${resource.name}")
+  Some(id)
+}
+  }
+
+  def validateBrokerConfigs(resource: ConfigResource, 

Review comment:
   Separating validation step from the persisting step. Previously 
alterBrokerConfig in `ZkAdminManager` did both steps. This will be helpful when 
enabling request forwarding by default for ZK brokers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679380487



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))
+
+  def validateConfigPolicy(resource: ConfigResource, configEntriesMap: 
Map[String, String]): Unit = {
+this.alterConfigPolicy match {
+  case Some(policy) =>
+policy.validate(new AlterConfigPolicy.RequestMetadata(
+  new ConfigResource(resource.`type`(), resource.name), 
configEntriesMap.asJava))
+  case None =>
+}
+  }
+
+  def getAndValidateBrokerId(resource: ConfigResource) = {
+if (resource.name == null || resource.name.isEmpty)
+  None
+else {
+  val id = resourceNameToBrokerId(resource.name)
+  if (id != this.config.brokerId)
+throw new InvalidRequestException(s"Unexpected broker id, expected 
${this.config.brokerId}, but received ${resource.name}")
+  Some(id)
+}
+  }
+
+  def validateBrokerConfigs(resource: ConfigResource, 

Review comment:
   Separating validation step from the persisting step. Previously 
`alterBrokerConfig` in `ZkAdminManager` did both steps. This will be helpful 
when enabling request forwarding by default for ZK brokers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679381726



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))

Review comment:
   Creating another PR soon to fail startup if Policies are configures.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679381726



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))

Review comment:
   Creating another PR soon to fail startup if Policies are configured.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679381726



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))

Review comment:
   Creating another PR soon to fail startup if policies are configured.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread GitBox


hachikuji commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r679344612



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1392,11 +1393,24 @@ class ReplicaManager(val config: KafkaConfig,
   s"leader epoch $currentLeaderEpoch")
 responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
   } else {
-stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
-  s"controller $controllerId with correlation id 
$correlationId " +
-  s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
-  s"leader epoch $requestLeaderEpoch matches the current 
leader epoch")
-responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+// The controller may send LeaderAndIsr to upgrade to using 
topic IDs without bumping the epoch.

Review comment:
   nit: does it makes sense to move this comment into the first `case`? 

##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -1115,6 +1115,63 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 assertEquals(topicIdAfterUpgrade.get, topicId)
 assertEquals("t", controller2.controllerContext.topicNames(topicId))
 
+TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, 
"log was not created")
+
+val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
+assertEquals(Some(topicId), topicIdInLog)
+
+adminZkClient.deleteTopic(tp.topic)
+TestUtils.waitUntilTrue(() => 
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),
+  "topic should have been removed from controller context after deletion")
+  }
+
+  @Test
+  def testTopicIdCreatedOnUpgradeMultiBrokerScenario(): Unit = {
+// Simulate an upgrade scenario where the controller is still on a 
pre-topic ID IBP, but the other two brokers are upgraded.
+servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+servers = servers ++ makeServers(3, startingIdNumber = 1)
+val originalControllerId = TestUtils.waitUntilControllerElected(zkClient)
+assertEquals(0, originalControllerId)
+val controller = getController().kafkaController
+val remainingBrokers = servers.filter(_.config.brokerId != 
originalControllerId)
+val tp = new TopicPartition("t", 0)
+val assignment = Map(tp.partition -> servers.map(_.config.brokerId))
+TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+waitForPartitionState(tp, firstControllerEpoch, originalControllerId, 
LeaderAndIsr.initialLeaderEpoch,
+  "failed to get expected partition state upon topic creation")
+val topicIdAfterCreate = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+assertEquals(None, topicIdAfterCreate)
+val emptyTopicId = controller.controllerContext.topicIds.get("t")
+assertEquals(None, emptyTopicId)
+
+// All logs should not have topic IDs
+servers.foreach({ server =>

Review comment:
   nit: usual pattern is
   ```scala
   servers.foreach { server =>
   ```
   More of these in here.

##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -1115,6 +1115,63 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 assertEquals(topicIdAfterUpgrade.get, topicId)
 assertEquals("t", controller2.controllerContext.topicNames(topicId))
 
+TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, 
"log was not created")
+
+val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
+assertEquals(Some(topicId), topicIdInLog)
+
+adminZkClient.deleteTopic(tp.topic)
+TestUtils.waitUntilTrue(() => 
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),
+  "topic should have been removed from controller context after deletion")
+  }
+
+  @Test
+  def testTopicIdCreatedOnUpgradeMultiBrokerScenario(): Unit = {
+// Simulate an upgrade scenario where the controller is still on a 
pre-topic ID IBP, but the other two brokers are upgraded.
+servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+servers = servers ++ makeServers(3, startingIdNumber = 1)
+val originalControllerId = TestUtils.waitUntilControllerElected(zkClient)
+assertEquals(0, originalControllerId)
+val controller = getController().kafkaController
+val remainingBrokers = servers.filter(_.config.brokerId != 
originalControllerId)
+val tp = new TopicPartition("t", 0)
+val assignment = Map(tp.partition -> servers.map(_.config.brokerId))

Review comment:
   Do we want to use `remainingBrokers` so that the controller is not part 
of the assignment?

##
File path: c

[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-07-29 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17390068#comment-17390068
 ] 

Ming Liu commented on KAFKA-12713:
--

Certainly, [~kaihuang], please take the ownership of the KIP and the PR. 

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-07-29 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r679386530



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -239,9 +249,68 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable

[GitHub] [kafka] jolshan commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread GitBox


jolshan commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r679387634



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -1115,6 +1115,63 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 assertEquals(topicIdAfterUpgrade.get, topicId)
 assertEquals("t", controller2.controllerContext.topicNames(topicId))
 
+TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, 
"log was not created")
+
+val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
+assertEquals(Some(topicId), topicIdInLog)
+
+adminZkClient.deleteTopic(tp.topic)
+TestUtils.waitUntilTrue(() => 
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),
+  "topic should have been removed from controller context after deletion")
+  }
+
+  @Test
+  def testTopicIdCreatedOnUpgradeMultiBrokerScenario(): Unit = {
+// Simulate an upgrade scenario where the controller is still on a 
pre-topic ID IBP, but the other two brokers are upgraded.
+servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+servers = servers ++ makeServers(3, startingIdNumber = 1)
+val originalControllerId = TestUtils.waitUntilControllerElected(zkClient)
+assertEquals(0, originalControllerId)
+val controller = getController().kafkaController
+val remainingBrokers = servers.filter(_.config.brokerId != 
originalControllerId)
+val tp = new TopicPartition("t", 0)
+val assignment = Map(tp.partition -> servers.map(_.config.brokerId))

Review comment:
   We can do that. As an aside, we don't store the topic ID on the original 
controller since it has a lower IBP when it restarts in this test. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread GitBox


jolshan commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r679390415



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1392,11 +1393,24 @@ class ReplicaManager(val config: KafkaConfig,
   s"leader epoch $currentLeaderEpoch")
 responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
   } else {
-stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
-  s"controller $controllerId with correlation id 
$correlationId " +
-  s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
-  s"leader epoch $requestLeaderEpoch matches the current 
leader epoch")
-responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+// The controller may send LeaderAndIsr to upgrade to using 
topic IDs without bumping the epoch.
+val error = requestTopicId match {
+  case Some(topicId) if logTopicId.isEmpty =>
+// If we have a matching epoch, we expect the log to be 
defined.
+val log = localLogOrException(partition.topicPartition)
+log.assignTopicId(topicId)
+stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
+  s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation" +
+  s" id $correlationId epoch $controllerEpoch")

Review comment:
   ah i meant to do that. I missed it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9858) CVE-2016-3189 Use-after-free vulnerability in bzip2recover in bzip2 1.0.6 allows remote attackers to cause a denial of service (crash) via a crafted bzip2 file, related

2021-07-29 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9858.
--
Fix Version/s: 3.0.0
   Resolution: Fixed

> CVE-2016-3189  Use-after-free vulnerability in bzip2recover in bzip2 1.0.6 
> allows remote attackers to cause a denial of service (crash) via a crafted 
> bzip2 file, related to block ends set to before the start of the block.
> -
>
> Key: KAFKA-9858
> URL: https://issues.apache.org/jira/browse/KAFKA-9858
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: sihuanx
>Priority: Major
> Fix For: 3.0.0
>
>
> I'm not sure whether  CVE-2016-3189 affects kafka 2.4.1  or not?  This 
> vulnerability  was related to rocksdbjni-5.18.3.jar  which is compiled with 
> *bzip2 .* 
> Is there any task or plan to fix it? 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9858) CVE-2016-3189 Use-after-free vulnerability in bzip2recover in bzip2 1.0.6 allows remote attackers to cause a denial of service (crash) via a crafted bzip2 file, relate

2021-07-29 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17390071#comment-17390071
 ] 

Guozhang Wang commented on KAFKA-9858:
--

Hi [~sihuan][~microle.dong] in https://issues.apache.org/jira/browse/KAFKA-8897 
we've upgraded to rocksDB: "6.19.3", so I think we can mark it as resolved in 
the upcoming 3.0

> CVE-2016-3189  Use-after-free vulnerability in bzip2recover in bzip2 1.0.6 
> allows remote attackers to cause a denial of service (crash) via a crafted 
> bzip2 file, related to block ends set to before the start of the block.
> -
>
> Key: KAFKA-9858
> URL: https://issues.apache.org/jira/browse/KAFKA-9858
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: sihuanx
>Priority: Major
>
> I'm not sure whether  CVE-2016-3189 affects kafka 2.4.1  or not?  This 
> vulnerability  was related to rocksdbjni-5.18.3.jar  which is compiled with 
> *bzip2 .* 
> Is there any task or plan to fix it? 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread GitBox


jolshan commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r679404417



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1392,11 +1393,24 @@ class ReplicaManager(val config: KafkaConfig,
   s"leader epoch $currentLeaderEpoch")
 responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
   } else {
-stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
-  s"controller $controllerId with correlation id 
$correlationId " +
-  s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
-  s"leader epoch $requestLeaderEpoch matches the current 
leader epoch")
-responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+// The controller may send LeaderAndIsr to upgrade to using 
topic IDs without bumping the epoch.

Review comment:
   I originally put it there to explain why we had the case in the first 
place, but it still gets the point across in the first case. I'll move it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-07-29 Thread GitBox


vvcephei merged pull request #10693:
URL: https://github.com/apache/kafka/pull/10693


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679413628



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,79 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+// thus, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
   Are you saying the `CachingWindowStore` internally uses a `TimeWindow`? 
Or is the `TimeWindow` somewhere along the store supplier code path...?
   
   Either way, doesn't this mean there's still a hole in the API since you 
can't use a custom WindowStore for a sliding windowed aggregation with the 
windowSize set to 0? If the WindowStore is going to represent different kinds 
of constant-size windows, it should probably be agnostic to the specific type 
of constant-sized window.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13150) How is Kafkastream configured to consume data from a specified offset ?

2021-07-29 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17390096#comment-17390096
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13150:


[~wangjh] Jira is intended only for bug reports and feature requests, please us 
the mailing list for questions

> How is Kafkastream configured to consume data from a specified offset ?
> ---
>
> Key: KAFKA-13150
> URL: https://issues.apache.org/jira/browse/KAFKA-13150
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: wangjh
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13150) How is Kafkastream configured to consume data from a specified offset ?

2021-07-29 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13150.

Resolution: Invalid

> How is Kafkastream configured to consume data from a specified offset ?
> ---
>
> Key: KAFKA-13150
> URL: https://issues.apache.org/jira/browse/KAFKA-13150
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: wangjh
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-07-29 Thread GitBox


jlprat commented on pull request #10693:
URL: https://github.com/apache/kafka/pull/10693#issuecomment-889393251


   Thanks @vvcephei for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-07-29 Thread Kai Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kai Huang reassigned KAFKA-12713:
-

Assignee: Kai Huang

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Assignee: Kai Huang
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on pull request #11143: MINOR: close TopologyTestDriver to release resources

2021-07-29 Thread GitBox


bbejeck commented on pull request #11143:
URL: https://github.com/apache/kafka/pull/11143#issuecomment-889413715


   failure unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #11143: MINOR: close TopologyTestDriver to release resources

2021-07-29 Thread GitBox


bbejeck merged pull request #11143:
URL: https://github.com/apache/kafka/pull/11143


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11143: MINOR: close TopologyTestDriver to release resources

2021-07-29 Thread GitBox


bbejeck commented on pull request #11143:
URL: https://github.com/apache/kafka/pull/11143#issuecomment-889414313


   Merged #11143 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11131: KAFKA-13137: KRaft Controller Metric MBean names incorrectly quoted

2021-07-29 Thread GitBox


hachikuji merged pull request #11131:
URL: https://github.com/apache/kafka/pull/11131


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13137) KRaft Controller Metric MBean names are incorrectly quoted

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13137:

Affects Version/s: (was: 3.0.0)

> KRaft Controller Metric MBean names are incorrectly quoted
> --
>
> Key: KAFKA-13137
> URL: https://issues.apache.org/jira/browse/KAFKA-13137
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.8.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Blocker
> Fix For: 3.0.0
>
>
> QuorumControllerMetrics is letting com.yammer.metrics.MetricName create the 
> MBean names for all of the controller metrics, and that adds quotes.  We have 
> typically used KafkaMetricsGroup to explicitly create the MBean name, and we 
> do not add quotes there.  The controller metric names that are in common 
> between the old and new controller must remain the same, but they are not.  
> For example, this non-KRaft MBean name:
> kafka.controller:type=KafkaController,name=OfflinePartitionsCount
> has morphed into this when using KRaft:
> "kafka.controller":type="KafkaController",name="OfflinePartitionsCount"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10909: KAFKA-12158: Better return type of RaftClient.scheduleAppend

2021-07-29 Thread GitBox


hachikuji commented on a change in pull request #10909:
URL: https://github.com/apache/kafka/pull/10909#discussion_r679390697



##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -186,13 +186,18 @@ class KafkaRaftManager[T](
 records: Seq[T],
 isAtomic: Boolean
   ): Option[Long] = {
-val offset = if (isAtomic) {
-  client.scheduleAtomicAppend(epoch, records.asJava)
-} else {
-  client.scheduleAppend(epoch, records.asJava)
+try {

Review comment:
   I'm looking into the usages here. It looks like we have no uses for 
`RaftManager.scheduleAtomicAppend` and the only use of `scheduleAppend` is in 
`TestRaftServer`. Maybe we can remove these two methods and change 
`TestRaftServer` to write to the client directly (since it is exposed by 
`RaftManager` already).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10909: KAFKA-12158: Better return type of RaftClient.scheduleAppend

2021-07-29 Thread GitBox


hachikuji commented on a change in pull request #10909:
URL: https://github.com/apache/kafka/pull/10909#discussion_r679391787



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -61,10 +61,12 @@ public synchronized void increment() {
 
 int epoch = claimedEpoch.getAsInt();
 uncommitted += 1;
-Long offset = client.scheduleAppend(epoch, singletonList(uncommitted));
-if (offset != null) {
+try {
+long offset = client.scheduleAppend(epoch, 
singletonList(uncommitted));
 log.debug("Scheduled append of record {} with epoch {} at offset 
{}",
 uncommitted, epoch, offset);
+} catch (IllegalStateException e) {

Review comment:
   How about we catch `Exception` generally? 

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -148,7 +150,7 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 }
 
 if (batch == null) {
-return null;
+throw new IllegalStateException("Append failed because we 
failed to allocate memory to write the batch");

Review comment:
   I wonder if it is worthwhile introducing some custom raft exception 
types for these two cases. At a minimum, that would let us improve the 
assertions. Maybe `NotLeaderException` and `BufferAllocationException` or 
something like that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679380487



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))
+
+  def validateConfigPolicy(resource: ConfigResource, configEntriesMap: 
Map[String, String]): Unit = {
+this.alterConfigPolicy match {
+  case Some(policy) =>
+policy.validate(new AlterConfigPolicy.RequestMetadata(
+  new ConfigResource(resource.`type`(), resource.name), 
configEntriesMap.asJava))
+  case None =>
+}
+  }
+
+  def getAndValidateBrokerId(resource: ConfigResource) = {
+if (resource.name == null || resource.name.isEmpty)
+  None
+else {
+  val id = resourceNameToBrokerId(resource.name)
+  if (id != this.config.brokerId)
+throw new InvalidRequestException(s"Unexpected broker id, expected 
${this.config.brokerId}, but received ${resource.name}")
+  Some(id)
+}
+  }
+
+  def validateBrokerConfigs(resource: ConfigResource, 

Review comment:
   Separating validation step from the persisting step. Previously 
`alterBrokerConfig` in `ZkAdminManager` did both steps. This will be helpful 
when enabling request forwarding by default for ZK brokers as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11120: Add support for infinite endpoints for range queries

2021-07-29 Thread GitBox


patrickstuedi commented on a change in pull request #11120:
URL: https://github.com/apache/kafka/pull/11120#discussion_r679468196



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class RangeQueryIntegrationTest {
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+private static final Properties STREAMS_CONFIG = new Properties();
+private static final String APP_ID = "range-query-integration-test";
+private static final Long COMMIT_INTERVAL = 100L;
+private static String inputStream;
+private static final String TABLE_NAME = "mytable";
+private static final int DATA_SIZE = 5;
+
+private enum StoreType { InMemory, RocksDB, Timed };
+private StoreType storeType;
+private boolean enableLogging;
+private boolean enableCaching;
+private boolean forward;
+private KafkaStreams kafkaStreams;
+
+private LinkedList> records;
+private String low;
+private String high;
+private String middle;
+private String innerLow;
+private String innerHigh;
+private String innerLowBetween;
+private String innerHighBetween;
+
+public RangeQueryIntegrationTest(final StoreType storeType, final boolean 
enableLogging, final boolean enableCaching, final boolean forward) {
+this.storeType = storeType;
+this.enableLogging = enableLogging;
+this.enableCaching = enableCaching;
+this.forward = forward;
+
+records = new LinkedList<>();
+final int m = DATA_SIZE / 2;
+for (int i = 0; i < DATA_SIZE; i++) {
+final String key = "key-" + i * 2;
+final String value = "val-" + i * 2;
+records.add(new KeyValue<>(key, value));
+high = key;
+if (low == null) {
+low = key;
+}
+if (i == m) {
+middle = key;
+}
+if (i == 1) {
+innerLow = key;
+fina

[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

2021-07-29 Thread GitBox


jsancio commented on a change in pull request #6:
URL: https://github.com/apache/kafka/pull/6#discussion_r679475174



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -580,12 +580,17 @@ public void run() throws Exception {
 // written before we can return our result to the user.  Here, 
we hand off
 // the batch of records to the raft client.  They will be 
written out
 // asynchronously.
-final long offset;
+final Long offset;
 if (result.isAtomic()) {
 offset = raftClient.scheduleAtomicAppend(controllerEpoch, 
result.records());
 } else {
 offset = raftClient.scheduleAppend(controllerEpoch, 
result.records());
 }
+if (offset == null) {
+throw new IllegalStateException("The raft client was 
unable to allocate a buffer for an append");
+} else if (offset == Long.MAX_VALUE) {
+throw new IllegalStateException("Unable to append records 
since this is not the leader");
+}

Review comment:
   This is a partial fix until we merge 
https://github.com/apache/kafka/pull/10909




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #11129: Fix for flaky test in StoreQueryIntegrationTest

2021-07-29 Thread GitBox


vvcephei merged pull request #11129:
URL: https://github.com/apache/kafka/pull/11129


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11129: MINOR: Fix for flaky test in StoreQueryIntegrationTest

2021-07-29 Thread GitBox


vvcephei commented on pull request #11129:
URL: https://github.com/apache/kafka/pull/11129#issuecomment-889472160


   Oy. I didn't notice the commit title was wrongly formatted before merging. 
Fixed in the PR title anyway.
   
   Thanks for this contribution, @patrickstuedi ! And thanks for the review, 
@showuon !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13151) KRaft does not support Policies (e.g. AlterConfigPolicy). The server should fail startup if any are configured.

2021-07-29 Thread Ryan Dielhenn (Jira)
Ryan Dielhenn created KAFKA-13151:
-

 Summary: KRaft does not support Policies (e.g. AlterConfigPolicy). 
The server should fail startup if any are configured.
 Key: KAFKA-13151
 URL: https://issues.apache.org/jira/browse/KAFKA-13151
 Project: Kafka
  Issue Type: Task
Affects Versions: 3.0.0
Reporter: Ryan Dielhenn
Assignee: Ryan Dielhenn


log.cleanup.policy, alter.config.policy.class.name, and 
create.topic.policy.class.name are all unsupported by KRaft. KRaft servers 
should fail startup if any of these are configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread GitBox


hachikuji merged pull request #11126:
URL: https://github.com/apache/kafka/pull/11126


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13151) KRaft does not support Policies (e.g. AlterConfigPolicy). The server should fail startup if any are configured.

2021-07-29 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13151:
--
Description: alter.config.policy.class.name and 
create.topic.policy.class.name are unsupported by KRaft. KRaft servers should 
fail startup if any of these are configured.  (was: 
alter.config.policy.class.name and create.topic.policy.class.name are all 
unsupported by KRaft. KRaft servers should fail startup if any of these are 
configured.)

> KRaft does not support Policies (e.g. AlterConfigPolicy). The server should 
> fail startup if any are configured.
> ---
>
> Key: KAFKA-13151
> URL: https://issues.apache.org/jira/browse/KAFKA-13151
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
>
> alter.config.policy.class.name and create.topic.policy.class.name are 
> unsupported by KRaft. KRaft servers should fail startup if any of these are 
> configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13151) KRaft does not support Policies (e.g. AlterConfigPolicy). The server should fail startup if any are configured.

2021-07-29 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13151:
--
Description: alter.config.policy.class.name and 
create.topic.policy.class.name are all unsupported by KRaft. KRaft servers 
should fail startup if any of these are configured.  (was: log.cleanup.policy, 
alter.config.policy.class.name, and create.topic.policy.class.name are all 
unsupported by KRaft. KRaft servers should fail startup if any of these are 
configured.)

> KRaft does not support Policies (e.g. AlterConfigPolicy). The server should 
> fail startup if any are configured.
> ---
>
> Key: KAFKA-13151
> URL: https://issues.apache.org/jira/browse/KAFKA-13151
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
>
> alter.config.policy.class.name and create.topic.policy.class.name are all 
> unsupported by KRaft. KRaft servers should fail startup if any of these are 
> configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13132:
---

Assignee: Justine Olshan  (was: Jose Armando Garcia Sancio)

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13132.
-
Resolution: Fixed

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-07-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13132:

Affects Version/s: (was: 3.0.0)

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679517755



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2740,23 +2798,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   }.toBuffer
 }.toMap
 
-val (authorizedResources, unauthorizedResources) = configs.partition { 
case (resource, _) =>
-  resource.`type` match {
-case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
-  authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, 
CLUSTER_NAME)
-case ConfigResource.Type.TOPIC =>
-  authHelper.authorize(request.context, ALTER_CONFIGS, TOPIC, 
resource.name)
-case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
-  }
-}
+if (!request.isForwarded && config.usesSelfManagedQuorum) {
+  // If using KRaft, per broker config alterations should be validated on 
the broker that the config(s) is for before forwarding them to the controller
+  val results = configs.map { case (resource, alterConfigOps) =>
+try {
+  if (resource.`type` == ConfigResource.Type.BROKER) {
+  // In ZK case, the old config is retrieved, altered then 
validated
+  // val persistentProps = if (perBrokerConfig) 
adminKRaftClient.fetchEntityConfig(ConfigType.Broker, brokerId.get.toString)
+  // else adminKRaftClient.fetchEntityConfig(ConfigType.Broker, 
ConfigEntityName.Default)
+  // val configProps = 
this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)

Review comment:
   I am not sure if the old config is retrieved for the validation step in 
addition to the persisting step in a ZK cluster.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679517755



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2740,23 +2798,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   }.toBuffer
 }.toMap
 
-val (authorizedResources, unauthorizedResources) = configs.partition { 
case (resource, _) =>
-  resource.`type` match {
-case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
-  authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, 
CLUSTER_NAME)
-case ConfigResource.Type.TOPIC =>
-  authHelper.authorize(request.context, ALTER_CONFIGS, TOPIC, 
resource.name)
-case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
-  }
-}
+if (!request.isForwarded && config.usesSelfManagedQuorum) {
+  // If using KRaft, per broker config alterations should be validated on 
the broker that the config(s) is for before forwarding them to the controller
+  val results = configs.map { case (resource, alterConfigOps) =>
+try {
+  if (resource.`type` == ConfigResource.Type.BROKER) {
+  // In ZK case, the old config is retrieved, altered then 
validated
+  // val persistentProps = if (perBrokerConfig) 
adminKRaftClient.fetchEntityConfig(ConfigType.Broker, brokerId.get.toString)
+  // else adminKRaftClient.fetchEntityConfig(ConfigType.Broker, 
ConfigEntityName.Default)
+  // val configProps = 
this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)

Review comment:
   I am not sure if the old config is retrieved for the validation step in 
addition to the persisting step in a ZK cluster. It would be good to know if I 
need to get the old config for the validation step so that I can do it for the 
KRaft case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679517755



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2740,23 +2798,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   }.toBuffer
 }.toMap
 
-val (authorizedResources, unauthorizedResources) = configs.partition { 
case (resource, _) =>
-  resource.`type` match {
-case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
-  authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, 
CLUSTER_NAME)
-case ConfigResource.Type.TOPIC =>
-  authHelper.authorize(request.context, ALTER_CONFIGS, TOPIC, 
resource.name)
-case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
-  }
-}
+if (!request.isForwarded && config.usesSelfManagedQuorum) {
+  // If using KRaft, per broker config alterations should be validated on 
the broker that the config(s) is for before forwarding them to the controller
+  val results = configs.map { case (resource, alterConfigOps) =>
+try {
+  if (resource.`type` == ConfigResource.Type.BROKER) {
+  // In ZK case, the old config is retrieved, altered then 
validated
+  // val persistentProps = if (perBrokerConfig) 
adminKRaftClient.fetchEntityConfig(ConfigType.Broker, brokerId.get.toString)
+  // else adminKRaftClient.fetchEntityConfig(ConfigType.Broker, 
ConfigEntityName.Default)
+  // val configProps = 
this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)

Review comment:
   I am not sure if the old config is retrieved for the validation step in 
addition to the persisting step or just for the persisting step in a ZK 
cluster. Since these steps are coupled in the ZK case, it is hard to tell. It 
would be good to know if I need to get the old config for the validation step 
so that I can do it for the KRaft case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr opened a new pull request #11145: KAFKA-13151: KRaft does not support Policies (e.g. AlterConfigPolicy). The server should fail startup if any are configured.

2021-07-29 Thread GitBox


dielhennr opened a new pull request #11145:
URL: https://github.com/apache/kafka/pull/11145


   alter.config.policy.class.name and create.topic.policy.class.name are 
unsupported by KRaft. KRaft servers should fail startup if any of these are 
configured.
   
   Tested this manually by enabling/disabling the configs statically in a KRaft 
cluster.
   
   https://issues.apache.org/jira/browse/KAFKA-13151
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r679531569



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
##
@@ -112,15 +103,15 @@ public void shouldReturnTopologyByName() {
 final NamedTopology topology2 = builder2.buildNamedTopology(props);
 final NamedTopology topology3 = builder3.buildNamedTopology(props);
 streams = new KafkaStreamsNamedTopologyWrapper(asList(topology1, 
topology2, topology3), props, clientSupplier);
-assertThat(streams.getTopologyByName("topology-1"), 
equalTo(topology1));
-assertThat(streams.getTopologyByName("topology-2"), 
equalTo(topology2));
-assertThat(streams.getTopologyByName("topology-3"), 
equalTo(topology3));
+assertThat(streams.getTopologyByName("topology-1").get(), 
equalTo(topology1));
+assertThat(streams.getTopologyByName("topology-2").get(), 
equalTo(topology2));
+assertThat(streams.getTopologyByName("topology-3").get(), 
equalTo(topology3));
 }
 
 @Test
-public void 
shouldThrowIllegalArgumentWhenLookingUpNonExistentTopologyByName() {
+public void shouldReturnEmptyWhenLookingUpNonExistentTopologyByName() {

Review comment:
   We changed the behavior to return an empty Optional rather than throw, 
as users may want to use this API to determine whether the given named topology 
is known or not




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13151) KRaft does not support Policies (e.g. AlterConfigPolicy). The server should fail startup if any are configured.

2021-07-29 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13151:
--
Fix Version/s: 3.0.0

> KRaft does not support Policies (e.g. AlterConfigPolicy). The server should 
> fail startup if any are configured.
> ---
>
> Key: KAFKA-13151
> URL: https://issues.apache.org/jira/browse/KAFKA-13151
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> alter.config.policy.class.name and create.topic.policy.class.name are 
> unsupported by KRaft. KRaft servers should fail startup if any of these are 
> configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #11145: KAFKA-13151: KRaft does not support Policies (e.g. AlterConfigPolicy). The server should fail startup if any are configured.

2021-07-29 Thread GitBox


hachikuji commented on a change in pull request #11145:
URL: https://github.com/apache/kafka/pull/11145#discussion_r679533321



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2008,5 +2008,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 require(principalBuilderClass != null, 
s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null")
 
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass), 
   s"${KafkaConfig.PrincipalBuilderClassProp} must implement 
KafkaPrincipalSerde")
+
+if (usesSelfManagedQuorum) {
+  require(getClass(KafkaConfig.AlterConfigPolicyClassNameProp) == null, 
"alter.config.policy.class.name is not supported in KRaft, please disable.")

Review comment:
   nit: can you use `KafkaConfig.AlterConfigPolicyClassNameProp` in the 
message instead of the explicit config name? Same for the other.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #11145: KAFKA-13151: KRaft does not support Policies (e.g. AlterConfigPolicy). The server should fail startup if any are configured.

2021-07-29 Thread GitBox


dielhennr commented on a change in pull request #11145:
URL: https://github.com/apache/kafka/pull/11145#discussion_r679539020



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2008,5 +2008,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 require(principalBuilderClass != null, 
s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null")
 
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass), 
   s"${KafkaConfig.PrincipalBuilderClassProp} must implement 
KafkaPrincipalSerde")
+
+if (usesSelfManagedQuorum) {
+  require(getClass(KafkaConfig.AlterConfigPolicyClassNameProp) == null, 
"alter.config.policy.class.name is not supported in KRaft, please disable.")

Review comment:
   yes, sorry I thought I had changed this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679571374



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+// that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.topologyName(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are named topologies but some are empty, this indicates a 
bug in user code
+if (hasNamedTopologies()) {
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   Ahh yeah I was even confusing myself with this after a while. "Local 
topology" sounds much better -- will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679572637



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -16,17 +16,240 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
 public class NamedTopologyIntegrationTest {
-//TODO KAFKA-12648
-/**
- * Things to test in Pt. 2 -  Introduce TopologyMetadata to wrap 
InternalTopologyBuilders of named topologies:
- * 1. Verify changelog & repartition topics decorated with named topology
- * 2. Make sure app run and works with
- * -multiple subtopologies
- * -persistent state
- * -multi-partition input & output topics
- * -standbys
- * -piped input and verified output records
- * 3. Is the task assignment balanced? Does KIP-441/warmup replica 
placement work as intended?
- */
+
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Rule
+public final TestName testName = new TestName();
+private String appId;
+
+private String inputStream1;
+private String inputStream2;
+private String inputStream3;
+private String outputStream1;
+private String outputStream2;
+private String outputStream3;
+private String storeChangelog1;
+private String storeChangelog2;
+private String storeChangelog3;
+
+final List> standardInputData = 
asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 
300L), KeyValue.pair("C", 400L));
+final List> standardOutputData = 
asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); 
// output of basic count topology with caching
+
+final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
+final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
LongSerializer.class);
+final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+
+final NamedTopologyStreamsBuilder builder1 = new 
NamedTopologyStreamsBuilder("topology-1");
+final NamedTopologyStreamsBuilder builder2 = new 
NamedTopologyStreamsBuilder("topology-2");
+final NamedTopologyStreamsBuilder builder3 = new 
NamedTopologyStreamsBuilder("topology-3");
+
+Properties props;
+KafkaStreamsNamedTopologyWrapper streams;
+
+private Properties configProps() {
+final Properties streamsConfiguration = new Properties();
+streamsConfiguration.put(Stre

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679572946



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -16,17 +16,240 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
 public class NamedTopologyIntegrationTest {
-//TODO KAFKA-12648
-/**
- * Things to test in Pt. 2 -  Introduce TopologyMetadata to wrap 
InternalTopologyBuilders of named topologies:
- * 1. Verify changelog & repartition topics decorated with named topology
- * 2. Make sure app run and works with
- * -multiple subtopologies
- * -persistent state
- * -multi-partition input & output topics
- * -standbys
- * -piped input and verified output records
- * 3. Is the task assignment balanced? Does KIP-441/warmup replica 
placement work as intended?
- */
+
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Rule
+public final TestName testName = new TestName();
+private String appId;
+
+private String inputStream1;
+private String inputStream2;
+private String inputStream3;
+private String outputStream1;
+private String outputStream2;
+private String outputStream3;
+private String storeChangelog1;
+private String storeChangelog2;
+private String storeChangelog3;
+
+final List> standardInputData = 
asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 
300L), KeyValue.pair("C", 400L));
+final List> standardOutputData = 
asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); 
// output of basic count topology with caching
+
+final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
+final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
LongSerializer.class);
+final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+
+final NamedTopologyStreamsBuilder builder1 = new 
NamedTopologyStreamsBuilder("topology-1");
+final NamedTopologyStreamsBuilder builder2 = new 
NamedTopologyStreamsBuilder("topology-2");
+final NamedTopologyStreamsBuilder builder3 = new 
NamedTopologyStreamsBuilder("topology-3");
+
+Properties props;
+KafkaStreamsNamedTopologyWrapper streams;
+
+private Properties configProps() {
+final Properties streamsConfiguration = new Properties();
+streamsConfiguration.put(Stre

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679581108



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -16,17 +16,240 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
 public class NamedTopologyIntegrationTest {
-//TODO KAFKA-12648
-/**
- * Things to test in Pt. 2 -  Introduce TopologyMetadata to wrap 
InternalTopologyBuilders of named topologies:
- * 1. Verify changelog & repartition topics decorated with named topology
- * 2. Make sure app run and works with
- * -multiple subtopologies
- * -persistent state
- * -multi-partition input & output topics
- * -standbys
- * -piped input and verified output records
- * 3. Is the task assignment balanced? Does KIP-441/warmup replica 
placement work as intended?
- */
+
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Rule
+public final TestName testName = new TestName();
+private String appId;
+
+private String inputStream1;
+private String inputStream2;
+private String inputStream3;
+private String outputStream1;
+private String outputStream2;
+private String outputStream3;
+private String storeChangelog1;
+private String storeChangelog2;
+private String storeChangelog3;
+
+final List> standardInputData = 
asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 
300L), KeyValue.pair("C", 400L));
+final List> standardOutputData = 
asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); 
// output of basic count topology with caching
+
+final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
+final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
LongSerializer.class);
+final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+
+final NamedTopologyStreamsBuilder builder1 = new 
NamedTopologyStreamsBuilder("topology-1");
+final NamedTopologyStreamsBuilder builder2 = new 
NamedTopologyStreamsBuilder("topology-2");
+final NamedTopologyStreamsBuilder builder3 = new 
NamedTopologyStreamsBuilder("topology-3");
+
+Properties props;
+KafkaStreamsNamedTopologyWrapper streams;
+
+private Properties configProps() {
+final Properties streamsConfiguration = new Properties();
+streamsConfiguration.put(Stre

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679589984



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+// that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability

Review comment:
   Yeah I think it would fine (better even) to swap in the topologies 
rather than the topology builders, the only reason for using the builders now 
is that a huge amount of topology-related functionality currently resides in 
the InternalTopologyBuilder, including pretty much all the metadata. I 100% 
would support cleaning this up and separating things out from this class and 
made sure it would be easy to do so here, the builders are really only kept 
around after the topology is built because they still contain most of the 
metadata we need




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679589984



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+// that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability

Review comment:
   Yeah I think it would fine (better even) to swap in the topologies 
rather than the topology builders, the only reason for using the builders now 
is that a huge amount of topology-related functionality currently resides in 
the InternalTopologyBuilder, including pretty much all the metadata. I 100% 
would [support cleaning this 
up](https://github.com/apache/kafka/pull/10683#discussion_r679581108) and 
separating things out from this class and made sure it would be easy to do so 
here, the builders are really only kept around after the topology is built 
because they still contain most of the metadata we need. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-29 Thread GitBox


showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679596497



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,79 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+// thus, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
   > Are you saying the CachingWindowStore internally uses a TimeWindow? 
   
   Yes, `CachingWindowStore` internally uses `TimeWindow` (i.e. 
`WindowKeySchema`). And looks like we use `TimeWinow` for `WindowStore`, and 
use `SessionWindow` for `SessionStore`.
   
   > doesn't this mean there's still a hole in the API since you can't use a 
custom WindowStore for a sliding windowed aggregation with the windowSize set 
to 0?
   
   I think so
   
   > If the WindowStore is going to represent different kinds of constant-size 
windows, it should probably be agnostic to the specific type of constant-sized 
window.
   
   Do you mean we should use `SessionWindow` (i.e. [start, end] inclusive time 
window) to represent the window? I'm not sure if this is the original design or 
just a miss. What do you think?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11146: KAFKA-12648: minor followup from Pt. 2 and some new tests

2021-07-29 Thread GitBox


ableegoldman opened a new pull request #11146:
URL: https://github.com/apache/kafka/pull/11146


   Addresses the handful of remaining feedback from [Pt. 
2](https://github.com/apache/kafka/pull/10683), plus adds two new tests: one 
verifying a multi-topology application with a FKJ and its internal topics, 
another to make sure IQ works with named topologies (though note that there is 
a bit more work left for IQ to be fully supported, will be tackled after [Pt. 
3](https://github.com/apache/kafka/pull/10788)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679607519



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,79 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+// thus, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
   > Do you mean we should use SessionWindow (i.e. [start, end] inclusive 
time window) to represent the window?
   
   Not for the WindowStore (SessionWindow is fine for SessionStore of course), 
I'm saying if WindowStore is used for both inclusive-exclusive and also 
inclusive-inclusive windows, then we shouldn't ever assume one of them ie 
should not use an actual `TimeWindow` (or `InclusiveExclusiveWindow`) -- maybe 
we can just have a separate, plain window class that doesn't assume anything 
about its bounds.
   
   We can even just (re)use the `TimeWindow` for this if we do the renaming as 
discussed, and move the check that prevents size = 0 to that new class. Then we 
can just have a plain data container class `TimeWindow` that does nothing but 
hold the start and end time for use in window-agnostic cases like the 
CachingWindowStore. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11146: KAFKA-12648: minor followup from Pt. 2 and some new tests

2021-07-29 Thread GitBox


ableegoldman commented on pull request #11146:
URL: https://github.com/apache/kafka/pull/11146#issuecomment-889585580


   also cc @wcarlson5 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11147: KAFKA-13108: improve ConfigCommandTest test coverage

2021-07-29 Thread GitBox


showuon opened a new pull request #11147:
URL: https://github.com/apache/kafka/pull/11147


   Follow up PR for this: 
https://github.com/apache/kafka/pull/10811#issuecomment-883408956, to improve 
`ConfigCommandTest` test coverage.
   
   Tests added:
   `shouldNotAllowAddEntityDefaultBrokerQuotaConfigWhileBrokerUpUsingZookeeper`
   `shouldNotAllowDescribeEntityDefaultBrokerWhileBrokerUpUsingZookeeper`
   `shouldSupportDescribeEntityDefaultBrokerBeforeBrokerUpUsingZookeeper`
   
   The `EntityDefault` Broker config update test is already in 
`testDynamicBrokerConfigUpdateUsingZooKeeper`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >