[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2022-05-13 Thread GitBox


C0urante commented on PR #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-1126634597

   This will create other conflicts with 
https://github.com/apache/kafka/pull/11780. Would it be possible to prioritize 
that PR and then, once it's merged, fix the conflicts on this one and merge it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread GitBox


franz1981 commented on code in PR #12163:
URL: https://github.com/apache/kafka/pull/12163#discussion_r872925840


##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -40,10 +55,32 @@ public static void update(Checksum checksum, ByteBuffer 
buffer, int length) {
 public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {
 if (buffer.hasArray()) {
 checksum.update(buffer.array(), buffer.position() + 
buffer.arrayOffset() + offset, length);
-} else {
-int start = buffer.position() + offset;
-for (int i = start; i < start + length; i++)
-checksum.update(buffer.get(i));
+return;
+}
+if (BYTE_BUFFER_UPDATE != null) {
+final int oldPosition = buffer.position();
+final int oldLimit = buffer.limit();
+try {
+// save a slice to be used to save an allocation in the 
hot-path
+final int start = buffer.position() + offset;
+buffer.position(0);
+buffer.limit(start + length);
+buffer.position(start);
+BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer);
+return;
+} catch (Throwable t) {
+// fallback

Review Comment:
   The problem is the existing `update` signature: `MethodHandle` doesn't know 
about the `update` methods and it just assume that any method can raise a 
generic `Throwable`, even if it won't happen (as it's in this case).
   If I leave the `Throwable` to leak, given that's a checked exception, it's 
going to poison the existing code using it: I can wrap it in a generic 
`IllegalStateException` and do it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread GitBox


franz1981 commented on code in PR #12163:
URL: https://github.com/apache/kafka/pull/12163#discussion_r872924916


##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -40,10 +55,32 @@ public static void update(Checksum checksum, ByteBuffer 
buffer, int length) {
 public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {
 if (buffer.hasArray()) {
 checksum.update(buffer.array(), buffer.position() + 
buffer.arrayOffset() + offset, length);
-} else {
-int start = buffer.position() + offset;
-for (int i = start; i < start + length; i++)
-checksum.update(buffer.get(i));
+return;
+}
+if (BYTE_BUFFER_UPDATE != null) {
+final int oldPosition = buffer.position();
+final int oldLimit = buffer.limit();
+try {
+// save a slice to be used to save an allocation in the 
hot-path
+final int start = buffer.position() + offset;
+buffer.position(0);
+buffer.limit(start + length);
+buffer.position(start);
+BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer);
+return;
+} catch (Throwable t) {
+// fallback
+} finally {
+// reset buffer's offsets
+buffer.position(0);

Review Comment:
   I've just read the `ByteBuffer` doc: I have moved first `position` to save 
`limit`'s change to throw any exception, but I see that it won't happen...let 
me remove this, good point :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13745.
---
Resolution: Fixed

> Flaky 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
> -
>
> Key: KAFKA-13745
> URL: https://issues.apache.org/jira/browse/KAFKA-13745
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Example: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/
> {code}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35)
>   at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227)
>   at 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang merged pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang merged PR #12135:
URL: https://github.com/apache/kafka/pull/12135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone

2022-05-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536959#comment-17536959
 ] 

Guozhang Wang commented on KAFKA-13745:
---

Hi [~sagarrao] I'm happy to resolve the ticket now, and if we see it again we 
can re-create.

> Flaky 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
> -
>
> Key: KAFKA-13745
> URL: https://issues.apache.org/jira/browse/KAFKA-13745
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Example: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/
> {code}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35)
>   at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227)
>   at 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13800.
---
Resolution: Fixed

> Remove force cast of TimeWindowKStreamImpl in tests of 
> https://github.com/apache/kafka/pull/11896
> -
>
> Key: KAFKA-13800
> URL: https://issues.apache.org/jira/browse/KAFKA-13800
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>
> We can remove the cast after `emitStrategy` is added to public api



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on PR #12135:
URL: https://github.com/apache/kafka/pull/12135#issuecomment-1126616157

   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang closed pull request #12037: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang closed pull request #12037: KAFKA-13785: [7/N][Emit final] emit 
final for sliding window
URL: https://github.com/apache/kafka/pull/12037


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872914073


##
streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java:
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class SlidingWindowedKStreamIntegrationTest {
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
+mkProperties(
+mkMap(mkEntry("log.retention.hours", "-1"), 
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we 
manipulate timestamp
+)
+);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String streamOneInput;
+private String streamTwoInput;
+private String outputTopic;
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameter
+public StrategyType type;
+
+@Parameter(1)
+public boolean 

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872913253


##
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java:
##
@@ -0,0 +1,1238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Collection;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofHours;
+import static java.time.Duration.ofMinutes;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static 
org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues;
+import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
+import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class 

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872912670


##
streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java:
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class SlidingWindowedKStreamIntegrationTest {
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
+mkProperties(
+mkMap(mkEntry("log.retention.hours", "-1"), 
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we 
manipulate timestamp
+)
+);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String streamOneInput;
+private String streamTwoInput;
+private String outputTopic;
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameter
+public StrategyType type;
+
+@Parameter(1)
+public boolean 

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872912604


##
streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java:
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class SlidingWindowedKStreamIntegrationTest {
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
+mkProperties(
+mkMap(mkEntry("log.retention.hours", "-1"), 
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we 
manipulate timestamp
+)
+);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String streamOneInput;
+private String streamTwoInput;
+private String outputTopic;
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameter
+public StrategyType type;
+
+@Parameter(1)
+public boolean 

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872912257


##
streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java:
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class SlidingWindowedKStreamIntegrationTest {
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
+mkProperties(
+mkMap(mkEntry("log.retention.hours", "-1"), 
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we 
manipulate timestamp
+)
+);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String streamOneInput;
+private String streamTwoInput;
+private String outputTopic;
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameter
+public StrategyType type;
+
+@Parameter(1)
+public boolean 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode

2022-05-13 Thread GitBox


hachikuji commented on code in PR #12109:
URL: https://github.com/apache/kafka/pull/12109#discussion_r872906546


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -2033,25 +2039,40 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   LogConfig.CompressionTypeProp -> "producer"
 ).asJava
 val newTopic = new NewTopic(topic, 2, brokerCount.toShort)
-val e1 = assertThrows(classOf[ExecutionException],
-  () => 
client.createTopics(Collections.singletonList(newTopic.configs(invalidConfigs))).all.get())
-assertTrue(e1.getCause.isInstanceOf[InvalidRequestException],
-  s"Unexpected exception ${e1.getCause.getClass}")
+assertFutureExceptionTypeEquals(
+  
client.createTopics(Collections.singletonList(newTopic.configs(invalidConfigs))).all,
+  classOf[InvalidRequestException],
+  Some("Null value not supported for topic configs : retention.bytes")
+)
 
-val validConfigs = Map[String, String](LogConfig.CompressionTypeProp -> 
"producer").asJava
-
client.createTopics(Collections.singletonList(newTopic.configs(validConfigs))).all.get()
+val validConfigs = Map[String, String](LogConfig.CompressionTypeProp -> 
"producer")
+
client.createTopics(Collections.singletonList(newTopic.configs(validConfigs.asJava))).all.get()
 waitForTopics(client, expectedPresent = Seq(topic), expectedMissing = 
List())
+// Only need to wait for kraft brokers
+if (isKRaftTest()) {

Review Comment:
   Could we use the new `ensureConsistentKRaftMetadata` function?



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -773,10 +773,18 @@ static void validateNewTopicNames(Map 
topicErrors,
 for (CreatableTopic topic : topics) {
 if (topicErrors.containsKey(topic.name())) continue;
 Map> topicConfigs = new HashMap<>();
+List nullConfigs = new ArrayList<>();
 for (CreateTopicsRequestData.CreateableTopicConfig config : 
topic.configs()) {
-topicConfigs.put(config.name(), new 
SimpleImmutableEntry<>(SET, config.value()));
+if (config.value() == null) {
+nullConfigs.add(config.name());
+} else {
+topicConfigs.put(config.name(), new 
SimpleImmutableEntry<>(SET, config.value()));
+}
 }
-if (!topicConfigs.isEmpty()) {
+if (!nullConfigs.isEmpty()) {
+topicErrors.put(topic.name(), new 
ApiError(Errors.INVALID_REQUEST,

Review Comment:
   On a related note, I opened https://github.com/apache/kafka/pull/12162. I 
wonder if we should call this INVALID_CONFIG instead?



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -773,10 +773,18 @@ static void validateNewTopicNames(Map 
topicErrors,
 for (CreatableTopic topic : topics) {
 if (topicErrors.containsKey(topic.name())) continue;
 Map> topicConfigs = new HashMap<>();
+List nullConfigs = new ArrayList<>();
 for (CreateTopicsRequestData.CreateableTopicConfig config : 
topic.configs()) {
-topicConfigs.put(config.name(), new 
SimpleImmutableEntry<>(SET, config.value()));
+if (config.value() == null) {
+nullConfigs.add(config.name());
+} else {
+topicConfigs.put(config.name(), new 
SimpleImmutableEntry<>(SET, config.value()));
+}
 }
-if (!topicConfigs.isEmpty()) {
+if (!nullConfigs.isEmpty()) {
+topicErrors.put(topic.name(), new 
ApiError(Errors.INVALID_REQUEST,
+"Null value not supported for topic configs : " + 
String.join(",", nullConfigs)));

Review Comment:
   nit: no space before colon?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872905269


##
streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java:
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class SlidingWindowedKStreamIntegrationTest {
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
+mkProperties(
+mkMap(mkEntry("log.retention.hours", "-1"), 
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we 
manipulate timestamp
+)
+);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String streamOneInput;
+private String streamTwoInput;
+private String outputTopic;
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameter
+public StrategyType type;

Review Comment:
   Ack!



-- 
This is 

[GitHub] [kafka] cmccabe merged pull request #12155: MINOR: convert some tests to KRaft

2022-05-13 Thread GitBox


cmccabe merged PR #12155:
URL: https://github.com/apache/kafka/pull/12155


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #12122: Upgrade tests for KAFKA-13769

2022-05-13 Thread GitBox


mjsax merged PR #12122:
URL: https://github.com/apache/kafka/pull/12122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872901962


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##
@@ -103,59 +89,12 @@ public void enableSendingOldValues() {
 sendOldValues = true;
 }
 
-
-private class KStreamWindowAggregateProcessor extends 
ContextualProcessor, Change> {
-private TimestampedWindowStore windowStore;
-private TimestampedTupleForwarder, VAgg> tupleForwarder;
-private Sensor droppedRecordsSensor;
-private Sensor emittedRecordsSensor;
-private Sensor emitFinalLatencySensor;
+private class KStreamWindowAggregateProcessor extends 
AbstractKStreamTimeWindowAggregateProcessor {
 private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;

Review Comment:
   Ack!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872901603


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -467,13 +459,33 @@ private boolean rightWindowIsNotEmpty(final 
ValueAndTimestamp rightWinAgg,
 return rightWinAgg != null && rightWinAgg.timestamp() > 
inputRecordTimestamp;
 }
 
+@Override
+protected long emitRangeLowerBound(final long windowCloseTime) {
+return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ?
+0L : Math.max(0L, lastEmitWindowCloseTime - 
windows.timeDifferenceMs());
+}
+
+@Override
+protected long emitRangeUpperBound(final long windowCloseTime) {
+// Sliding window's start and end timestamps are inclusive, so
+// we should minus 1 for the inclusive closed window-end upper 
bound
+return windowCloseTime - windows.timeDifferenceMs() - 1;

Review Comment:
   I can actually be negative, and we would skip the range fetching in that 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-13 Thread GitBox


guozhangwang merged PR #12104:
URL: https://github.com/apache/kafka/pull/12104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872900761


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -50,22 +46,32 @@
 private final SlidingWindows windows;
 private final Initializer initializer;
 private final Aggregator aggregator;
+private final EmitStrategy emitStrategy;
 
 private boolean sendOldValues = false;
 
 public KStreamSlidingWindowAggregate(final SlidingWindows windows,
  final String storeName,
  final Initializer initializer,
  final Aggregator aggregator) {
+this(windows, storeName, EmitStrategy.onWindowUpdate(), initializer, 
aggregator);

Review Comment:
   I replied in the other comment that this is now only used in `cogroup` which 
do not have emit-on-final yet, but I guess we can always just call it 
explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872900375


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor droppedRecordsSensor;
+protected Sensor emittedRecordsSensor;
+protected Sensor emitFinalLatencySensor;
+protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+protected InternalProcessorContext, Change> 
internalProcessorContext;
+
+protected AbstractKStreamTimeWindowAggregateProcessor(final String 
storeName,
+  final EmitStrategy 
emitStrategy,
+  final boolean 
sendOldValues) {
+this.storeName = storeName;
+this.emitStrategy = emitStrategy;
+this.sendOldValues = sendOldValues;
+}
+
+@Override
+public void init(final ProcessorContext, Change> 
context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext, 
Change>) context;
+final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+windowStore = context.getStateStore(storeName);
+
+if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+// Restore last emit close time for ON_WINDOW_CLOSE strategy
+final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+  

[GitHub] [kafka] guozhangwang commented on pull request #12035: KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-05-13 Thread GitBox


guozhangwang commented on PR #12035:
URL: https://github.com/apache/kafka/pull/12035#issuecomment-1126589067

   Thanks @sayantanu-dey I will review asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13746) Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13746.
---
Resolution: Fixed

> Flaky 
> kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed
> 
>
> Key: KAFKA-13746
> URL: https://issues.apache.org/jira/browse/KAFKA-13746
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Example: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/
> {code}
> java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
>   at 
> kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:686)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12104:
URL: https://github.com/apache/kafka/pull/12104#discussion_r872898659


##
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala:
##
@@ -732,7 +733,18 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 try {
   killBroker(0)
   val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-  TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
+
+  if (isKRaftTest()) {
+TestUtils.ensureConsistentKRaftMetadata(aliveServers, 
controllerServer, "Timeout waiting for topic configs propagating to brokers")
+  } else {
+TestUtils.waitUntilTrue(
+  () => aliveServers.forall(
+broker =>
+  broker.metadataCache.getPartitionInfo(underMinIsrTopic, 
0).get.isr().size() < 6 &&
+broker.metadataCache.getPartitionInfo(offlineTopic, 
0).get.leader() == MetadataResponse.NO_LEADER_ID),

Review Comment:
   Thanks for the explanation @dengziming !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-05-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536926#comment-17536926
 ] 

Matthias J. Sax commented on KAFKA-13817:
-

> as it will eventually self-correct and continue to throttle, I believe how 
> fast it self-correct depends on the magnitude of clock-drift and the 
> {color:#871094}emitIntervalMs value.{color}

That's exactly the point. We want to avoid that we only throttle eventually for 
this case, but keep throttling right away. Thus, instead of just computing 
"next = next + X" we want to compute "next = now + X" to quickly fast forward 
in case we missed an interval.

We do similar thing in the windowed aggregation: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L260-L269]

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Lim Qing Wei
>Priority: Minor
>  Labels: beginner, newbie
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] xjin-Confluent opened a new pull request, #12164: Update upgrade.html

2022-05-13 Thread GitBox


xjin-Confluent opened a new pull request, #12164:
URL: https://github.com/apache/kafka/pull/12164

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-05-13 Thread Lim Qing Wei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536895#comment-17536895
 ] 

Lim Qing Wei commented on KAFKA-13817:
--

Hi [~lihaosky] , I am new to contributing to Kafka, I wish to understand the 
issue better

 

Do you mean that we might not throttle properly when clock drift happen because 
it relies on wall-clock?

I am not sure if I fully understand your example, are you saying that we should 
set `{color:#871094}sharedTimeTracker{color}.{color:#871094}nextTimeToEmit` to 
system time {color}on every method invocation?{color:#871094}
{color}

 

I notice we always invoke the following if we dont throttle
{code:java}
sharedTimeTracker.advanceNextTimeToEmit(){code}
 

This should mitigate the clock drift issue, right? as it will eventually 
self-correct and continue to throttle, I believe how fast it self-correct 
depends on the magnitude of clock-drift and the {color:#871094}emitIntervalMs 
value.
{color}

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Lim Qing Wei
>Priority: Minor
>  Labels: beginner, newbie
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Reopened] (KAFKA-13502) Support configuring BROKER_LOGGER on controller-only KRaft nodes

2022-05-13 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reopened KAFKA-13502:


I accidentally resolved this issue.

> Support configuring BROKER_LOGGER on controller-only KRaft nodes
> 
>
> Key: KAFKA-13502
> URL: https://issues.apache.org/jira/browse/KAFKA-13502
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-05-13 Thread Lim Qing Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lim Qing Wei reassigned KAFKA-13817:


Assignee: Lim Qing Wei

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Lim Qing Wei
>Priority: Minor
>  Labels: beginner, newbie
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13903) Add queue size metric to QuorumController

2022-05-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-13903:
-
Labels: kip-500  (was: )

> Add queue size metric to QuorumController
> -
>
> Key: KAFKA-13903
> URL: https://issues.apache.org/jira/browse/KAFKA-13903
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: David Arthur
>Priority: Major
>  Labels: kip-500
>
> In order to stay in-line with existing queue metrics in 
> ControllerEventManager mbean, we need to include a queue size. The current 
> size of the underlying queue in QuorumController needs to be exposed as: 
> {{kafka.controller:type=ControllerEventManager,name=EventQueueSize}}
> It looks like KafkaEventQueue will need to add support for reporting its size.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13902) Support online metadata.version upgrades for KIP-704 in KRaft

2022-05-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-13902:
-
Labels: kip-500  (was: )

> Support online metadata.version upgrades for KIP-704 in KRaft 
> --
>
> Key: KAFKA-13902
> URL: https://issues.apache.org/jira/browse/KAFKA-13902
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: David Arthur
>Priority: Minor
>  Labels: kip-500
>
> KIP-704 landed in trunk before the KIP-778 metadata.version work that is now 
> in progress. The KIP-704 (unclean leader recovery) code uses the usual IBP 
> approach for enabling the feature on startup.
> Once KAFKA-13830 is merged, we will need to update the KIP-704 code to deal 
> with online upgrades. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13903) Add queue size metric to QuorumController

2022-05-13 Thread David Arthur (Jira)
David Arthur created KAFKA-13903:


 Summary: Add queue size metric to QuorumController
 Key: KAFKA-13903
 URL: https://issues.apache.org/jira/browse/KAFKA-13903
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: David Arthur


In order to stay in-line with existing queue metrics in ControllerEventManager 
mbean, we need to include a queue size. The current size of the underlying 
queue in QuorumController needs to be exposed as: 
{{kafka.controller:type=ControllerEventManager,name=EventQueueSize}}

It looks like KafkaEventQueue will need to add support for reporting its size.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13902) Support online metadata.version upgrades for KIP-704 in KRaft

2022-05-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-13902:
-
Component/s: controller

> Support online metadata.version upgrades for KIP-704 in KRaft 
> --
>
> Key: KAFKA-13902
> URL: https://issues.apache.org/jira/browse/KAFKA-13902
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: David Arthur
>Priority: Minor
>
> KIP-704 landed in trunk before the KIP-778 metadata.version work that is now 
> in progress. The KIP-704 (unclean leader recovery) code uses the usual IBP 
> approach for enabling the feature on startup.
> Once KAFKA-13830 is merged, we will need to update the KIP-704 code to deal 
> with online upgrades. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13902) Support online metadata.version upgrades for KIP-704 in KRaft

2022-05-13 Thread David Arthur (Jira)
David Arthur created KAFKA-13902:


 Summary: Support online metadata.version upgrades for KIP-704 in 
KRaft 
 Key: KAFKA-13902
 URL: https://issues.apache.org/jira/browse/KAFKA-13902
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Arthur


KIP-704 landed in trunk before the KIP-778 metadata.version work that is now in 
progress. The KIP-704 (unclean leader recovery) code uses the usual IBP 
approach for enabling the feature on startup.

Once KAFKA-13830 is merged, we will need to update the KIP-704 code to deal 
with online upgrades. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ijuma commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread GitBox


ijuma commented on code in PR #12163:
URL: https://github.com/apache/kafka/pull/12163#discussion_r872799071


##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -40,10 +55,32 @@ public static void update(Checksum checksum, ByteBuffer 
buffer, int length) {
 public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {
 if (buffer.hasArray()) {
 checksum.update(buffer.array(), buffer.position() + 
buffer.arrayOffset() + offset, length);
-} else {
-int start = buffer.position() + offset;
-for (int i = start; i < start + length; i++)
-checksum.update(buffer.get(i));
+return;
+}
+if (BYTE_BUFFER_UPDATE != null) {
+final int oldPosition = buffer.position();
+final int oldLimit = buffer.limit();
+try {
+// save a slice to be used to save an allocation in the 
hot-path
+final int start = buffer.position() + offset;
+buffer.position(0);
+buffer.limit(start + length);
+buffer.position(start);
+BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer);
+return;
+} catch (Throwable t) {
+// fallback

Review Comment:
   Shouldn't we propagate the exception instead of falling back?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread GitBox


ijuma commented on code in PR #12163:
URL: https://github.com/apache/kafka/pull/12163#discussion_r872795794


##
clients/src/main/java/org/apache/kafka/common/utils/Checksums.java:
##
@@ -40,10 +55,32 @@ public static void update(Checksum checksum, ByteBuffer 
buffer, int length) {
 public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {
 if (buffer.hasArray()) {
 checksum.update(buffer.array(), buffer.position() + 
buffer.arrayOffset() + offset, length);
-} else {
-int start = buffer.position() + offset;
-for (int i = start; i < start + length; i++)
-checksum.update(buffer.get(i));
+return;
+}
+if (BYTE_BUFFER_UPDATE != null) {
+final int oldPosition = buffer.position();
+final int oldLimit = buffer.limit();
+try {
+// save a slice to be used to save an allocation in the 
hot-path
+final int start = buffer.position() + offset;
+buffer.position(0);
+buffer.limit(start + length);
+buffer.position(start);
+BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer);
+return;
+} catch (Throwable t) {
+// fallback
+} finally {
+// reset buffer's offsets
+buffer.position(0);

Review Comment:
   Is it intentional that we are calling `position` twice in the `finally` 
block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-05-13 Thread GitBox


jnh5y commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r872767805


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -897,7 +897,8 @@ private void initializeAndRestorePhase() {
 }
 // we can always let changelog reader try restoring in order to 
initialize the changelogs;
 // if there's no active restoring or standby updating it would not try 
to fetch any data
-changelogReader.restore(taskManager.tasks());
+// After KAFKA-13873, we only restore the not paused tasks.
+changelogReader.restore(taskManager.notPausedTasks());

Review Comment:
   "Yes."
   
   The next time that a resumed task goes through the loop it will be returned 
in `notPausedTasks()` and `restore` will be called.  (I say "Yes" because 
that's not a separate, direct call to `restore` when resuming.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-05-13 Thread GitBox


jnh5y commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r872767805


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -897,7 +897,8 @@ private void initializeAndRestorePhase() {
 }
 // we can always let changelog reader try restoring in order to 
initialize the changelogs;
 // if there's no active restoring or standby updating it would not try 
to fetch any data
-changelogReader.restore(taskManager.tasks());
+// After KAFKA-13873, we only restore the not paused tasks.
+changelogReader.restore(taskManager.notPausedTasks());

Review Comment:
   "Yes."
   
   The next time that a resumed task goes through the loop it will be returned 
in `notPausedTasks()` and `restore` will be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-05-13 Thread GitBox


jnh5y commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r872766397


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -463,7 +463,10 @@ public void restore(final Map tasks) {
 final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
 try {
 if (restoreChangelog(changelogs.get(partition))) {
-tasks.get(taskId).clearTaskTimeout();
+final Task task = tasks.get(taskId);
+if (task != null) {

Review Comment:
   The taskId is coming from the changeLogs partition info a few lines above.  
The tasks map returned is now smaller and does not contain all the keys anymore!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-05-13 Thread GitBox


wcarlson5 commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r872731431


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -463,7 +463,10 @@ public void restore(final Map tasks) {
 final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
 try {
 if (restoreChangelog(changelogs.get(partition))) {
-tasks.get(taskId).clearTaskTimeout();
+final Task task = tasks.get(taskId);
+if (task != null) {

Review Comment:
   Why are we getting null here when we were not before?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -273,6 +273,12 @@ Collection allTasks() {
 return readOnlyTasks;
 }
 
+Collection notPausedTasks() {
+return 
Collections.unmodifiableCollection(readOnlyActiveTasks.stream().filter(t ->

Review Comment:
   nit: Can we align each call on its own line for readability?



##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -0,0 +1,897 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForStandbyCompletion;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.LagInfo;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import 

[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-13 Thread GitBox


junrao commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r872705006


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   Yes, then we just need to pass in a single endpoint to 
AbstractFetcherThread. This seems simpler, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12041: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed

2022-05-13 Thread GitBox


guozhangwang commented on PR #12041:
URL: https://github.com/apache/kafka/pull/12041#issuecomment-1126350779

   AH yes, that's clear. 
   
   My concern was that it's assuming the defined properties should be all 
retrieved in the constructor (since the `logUnused` is called at the end of 
it). I think it's true for most clients --- at least I think in producer and 
consumer, but it may not be the case for streams.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] franz1981 commented on pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread GitBox


franz1981 commented on PR #12163:
URL: https://github.com/apache/kafka/pull/12163#issuecomment-1126318782

   I've put it in draft mode in order to better format it according to the 
community's rule, but feel free to review it: content-wise should be done.
   
   Some numbers, for JDK 11:
   ```
   Benchmark (bytes)  (direct)  (readonly)  (seed)   Mode  Cnt  
 Score   Error   Units
   Crc32CBenchamrk.checksum  128 false   false  42  thrpt   20  
26.922 ± 0.065  ops/us
   Crc32CBenchamrk.checksum  128 falsetrue  42  thrpt   20  
 9.321 ± 0.031  ops/us
   Crc32CBenchamrk.checksum  128  true   false  42  thrpt   20  
24.656 ± 0.620  ops/us
   Crc32CBenchamrk.checksum  128  truetrue  42  thrpt   20  
24.429 ± 0.668  ops/us
   Crc32CBenchamrk.checksum 1024 false   false  42  thrpt   20  
 6.548 ± 0.025  ops/us
   Crc32CBenchamrk.checksum 1024 falsetrue  42  thrpt   20  
 3.603 ± 0.087  ops/us
   Crc32CBenchamrk.checksum 1024  true   false  42  thrpt   20  
 6.432 ± 0.136  ops/us
   Crc32CBenchamrk.checksum 1024  truetrue  42  thrpt   20  
 6.499 ± 0.042  ops/us
   Crc32CBenchamrk.checksum 4096 false   false  42  thrpt   20  
 4.031 ± 0.022  ops/us
   Crc32CBenchamrk.checksum 4096 falsetrue  42  thrpt   20  
 1.511 ± 0.009  ops/us
   Crc32CBenchamrk.checksum 4096  true   false  42  thrpt   20  
 4.004 ± 0.016  ops/us
   Crc32CBenchamrk.checksum 4096  truetrue  42  thrpt   20  
 4.002 ± 0.020  ops/us
   ```
   The purpose of the PR is to makes heap and direct ByteBuffer able to perform 
the same (especially not read-only).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13901) Exactly once producer cannot start due to TimeoutException: Timeout expired after 300000ms awaiting InitProducerId

2022-05-13 Thread Ben Augarten (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Augarten updated KAFKA-13901:
-
Description: 
I'm currently running into this problem running org.apache.kafka:kafka and 
org.apache.kafka:kafka-client version 2.5.1.  

 

The symptoms of this problem are very similar to 
https://issues.apache.org/jira/browse/KAFKA-8803

 

I'm currently running a test that runs one embedded Kafka broker and an 
embedded Flink cluster (version 1.13). The Flink application uses an exactly 
once Kafka producer. When initializing, the FlinkKafkaProducer calls 
[initTransactions|https://github.com/apache/flink/blob/be969dd73b533b03acaba1d81d03b29fccc54bfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1282-L1285]
 for each of ~50 underlying KafkaProducers. On some executions of the test (but 
not all of them), one of these calls to InitProducerId times out. Based on my 
reading of the logs and some debugging sessions, it seems that the failing 
producer continually tries to issue FindCoordinatorRequests, but 
[Sender.awaitNodeReady|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L511-L526]
 returns null. It seems like every time 
[NetworkClient.leastLoadedNode|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L662]
 returns null and prints that the one broker "is neither ready for sending or 
connecting"([source|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L692]).

I set the producer max.block.ms to be 5mins and you can see in the logs that 
this goes on and on for the full 5 minutes. During this time, the broker 
appears healthy (and even serves other requests). It seems that the client is 
not even attempting to reconnect to the broker during this 5 minutes, though I 
truthfully don't understand what could be going wrong looking through the code. 
Do you have any ideas? Any more information I could provide? 


  was:
I'm currently running into this problem running org.apache.kafka:kafka and 
org.apache.kafka:kafka-client version 2.5.1.  

 

The symptoms of this problem are very similar to 
https://issues.apache.org/jira/browse/KAFKA-8803

 

I'm currently running a test that runs one embedded Kafka broker and an 
embedded Flink cluster (version 1.13). The Flink application uses an exactly 
once Kafka producer. When initializing, the FlinkKafkaProducer calls 
[initTransactions|https://github.com/apache/flink/blob/be969dd73b533b03acaba1d81d03b29fccc54bfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1282-L1285]
 for each of ~50 underlying KafkaProducers. On some executions of the test (but 
not all of them), one of these calls to InitProducerId times out. Based on my 
reading of the logs and some debugging sessions, it seems that the failing 
producer continually tries to issue FindCoordinatorRequests, but 
[Sender.awaitNodeReady|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L511-L526]
 returns null. It seems like every time 
[NetworkClient.leastLoadedNode|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L662]
 returns null prints that the one broker "is neither ready for sending or 
connecting"([source|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L692]).

I set the producer max.block.ms to be 5mins and you can see in the logs that 
this goes on and on for the full 5 minutes. During this time, the broker 
appears healthy (and even serves other requests). It seems that the client is 
not even attempting to reconnect to the broker during this 5 minutes, though I 
truthfully don't understand what could be going wrong looking through the code. 
Do you have any ideas? Any more information I could provide? 



> Exactly once producer cannot start due to TimeoutException: Timeout expired 
> after 30ms awaiting InitProducerId
> --
>
> Key: KAFKA-13901
> URL: https://issues.apache.org/jira/browse/KAFKA-13901
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.1
>Reporter: Ben Augarten
>Priority: Major
> Attachments: broker-logs-renamed-topics, client-logs-truncated
>
>
> I'm currently running into this problem running org.apache.kafka:kafka and 
> org.apache.kafka:kafka-client version 2.5.1.  
>  
> The symptoms of this problem are very similar to 
> 

[jira] [Updated] (KAFKA-13900) Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread Francesco Nigro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Nigro updated KAFKA-13900:

Description: 
Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
(see [Java 9's 
Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):
 Kafka already provides specific support for Java 9's Cr32C, hence it makes 
sense it's going to use the most optimized version of it for direct ByteBuffers 
as well (read-only or not), instead of performing a byte-per-byte computation.

 

I'm aware that currently the client's Buffer pools aren't using direct 
ByteBuffer, but having full support for it can open the door to future 
interesting optimizations on it.

  was:
Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
(see [Java 9's 
Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):]
 kafka already provides specific support for Java 9's Cr32C, hence it makes 
sense it's going to use the most optimized version of it, in case fed 
ByteBuffer is direct (read-only or not), instead of performing a computation 
byte-per-byte.

 

I'm aware that currently the client's Buffer pools aren't using direct 
ByteBuffer, but having full support for it can open the door to future 
interesting optimizations on it.


> Support Java 9 direct ByteBuffer Checksum methods
> -
>
> Key: KAFKA-13900
> URL: https://issues.apache.org/jira/browse/KAFKA-13900
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 3.1.1
>Reporter: Francesco Nigro
>Priority: Minor
>  Labels: performance, performance-benchmark
>
> Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
> (see [Java 9's 
> Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):
>  Kafka already provides specific support for Java 9's Cr32C, hence it makes 
> sense it's going to use the most optimized version of it for direct 
> ByteBuffers as well (read-only or not), instead of performing a byte-per-byte 
> computation.
>  
> I'm aware that currently the client's Buffer pools aren't using direct 
> ByteBuffer, but having full support for it can open the door to future 
> interesting optimizations on it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13901) Exactly once producer cannot start due to TimeoutException: Timeout expired after 300000ms awaiting InitProducerId

2022-05-13 Thread Ben Augarten (Jira)
Ben Augarten created KAFKA-13901:


 Summary: Exactly once producer cannot start due to 
TimeoutException: Timeout expired after 30ms awaiting InitProducerId
 Key: KAFKA-13901
 URL: https://issues.apache.org/jira/browse/KAFKA-13901
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.5.1
Reporter: Ben Augarten
 Attachments: broker-logs-renamed-topics, client-logs-truncated

I'm currently running into this problem running org.apache.kafka:kafka and 
org.apache.kafka:kafka-client version 2.5.1.  

 

The symptoms of this problem are very similar to 
https://issues.apache.org/jira/browse/KAFKA-8803

 

I'm currently running a test that runs one embedded Kafka broker and an 
embedded Flink cluster (version 1.13). The Flink application uses an exactly 
once Kafka producer. When initializing, the FlinkKafkaProducer calls 
[initTransactions|https://github.com/apache/flink/blob/be969dd73b533b03acaba1d81d03b29fccc54bfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1282-L1285]
 for each of ~50 underlying KafkaProducers. On some executions of the test (but 
not all of them), one of these calls to InitProducerId times out. Based on my 
reading of the logs and some debugging sessions, it seems that the failing 
producer continually tries to issue FindCoordinatorRequests, but 
[Sender.awaitNodeReady|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L511-L526]
 returns null. It seems like every time 
[NetworkClient.leastLoadedNode|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L662]
 returns null prints that the one broker "is neither ready for sending or 
connecting"([source|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L692]).

I set the producer max.block.ms to be 5mins and you can see in the logs that 
this goes on and on for the full 5 minutes. During this time, the broker 
appears healthy (and even serves other requests). It seems that the client is 
not even attempting to reconnect to the broker during this 5 minutes, though I 
truthfully don't understand what could be going wrong looking through the code. 
Do you have any ideas? Any more information I could provide? 




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13900) Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread Francesco Nigro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Nigro updated KAFKA-13900:

Description: 
Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
(see [[Java 9's 
Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-):]
 kafka already provides specific support for Java 9's Cr32C, hence it makes 
sense it's going to use the most optimized version of it, in case fed 
ByteBuffer is direct (read-only or not), instead of performing a computation 
byte-per-byte.

 

I'm aware that currently the client's Buffer pools aren't using direct 
ByteBuffer, but having full support for it can open the door to future 
interesting optimizations on it.

  was:
Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
(see 
https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-):
 kafka already provides specific support for Java 9's Cr32C, hence it makes 
sense it's going to use the most optimized version of it, in case fed 
ByteBuffer is direct (read-only or not), instead of performing a computation 
byte-per-byte.

 

I'm aware that currently the client's Buffer pools aren't using direct 
ByteBuffer, but having full support for it can open the door to future 
interesting optimizations on it.


> Support Java 9 direct ByteBuffer Checksum methods
> -
>
> Key: KAFKA-13900
> URL: https://issues.apache.org/jira/browse/KAFKA-13900
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 3.1.1
>Reporter: Francesco Nigro
>Priority: Minor
>  Labels: performance, performance-benchmark
>
> Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
> (see [[Java 9's 
> Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-):]
>  kafka already provides specific support for Java 9's Cr32C, hence it makes 
> sense it's going to use the most optimized version of it, in case fed 
> ByteBuffer is direct (read-only or not), instead of performing a computation 
> byte-per-byte.
>  
> I'm aware that currently the client's Buffer pools aren't using direct 
> ByteBuffer, but having full support for it can open the door to future 
> interesting optimizations on it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13900) Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread Francesco Nigro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francesco Nigro updated KAFKA-13900:

Description: 
Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
(see [Java 9's 
Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):]
 kafka already provides specific support for Java 9's Cr32C, hence it makes 
sense it's going to use the most optimized version of it, in case fed 
ByteBuffer is direct (read-only or not), instead of performing a computation 
byte-per-byte.

 

I'm aware that currently the client's Buffer pools aren't using direct 
ByteBuffer, but having full support for it can open the door to future 
interesting optimizations on it.

  was:
Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
(see [[Java 9's 
Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-):]
 kafka already provides specific support for Java 9's Cr32C, hence it makes 
sense it's going to use the most optimized version of it, in case fed 
ByteBuffer is direct (read-only or not), instead of performing a computation 
byte-per-byte.

 

I'm aware that currently the client's Buffer pools aren't using direct 
ByteBuffer, but having full support for it can open the door to future 
interesting optimizations on it.


> Support Java 9 direct ByteBuffer Checksum methods
> -
>
> Key: KAFKA-13900
> URL: https://issues.apache.org/jira/browse/KAFKA-13900
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 3.1.1
>Reporter: Francesco Nigro
>Priority: Minor
>  Labels: performance, performance-benchmark
>
> Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
> (see [Java 9's 
> Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):]
>  kafka already provides specific support for Java 9's Cr32C, hence it makes 
> sense it's going to use the most optimized version of it, in case fed 
> ByteBuffer is direct (read-only or not), instead of performing a computation 
> byte-per-byte.
>  
> I'm aware that currently the client's Buffer pools aren't using direct 
> ByteBuffer, but having full support for it can open the door to future 
> interesting optimizations on it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13900) Support Java 9 direct ByteBuffer Checksum methods

2022-05-13 Thread Francesco Nigro (Jira)
Francesco Nigro created KAFKA-13900:
---

 Summary: Support Java 9 direct ByteBuffer Checksum methods
 Key: KAFKA-13900
 URL: https://issues.apache.org/jira/browse/KAFKA-13900
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Affects Versions: 3.1.1
Reporter: Francesco Nigro


Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) 
(see 
https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-):
 kafka already provides specific support for Java 9's Cr32C, hence it makes 
sense it's going to use the most optimized version of it, in case fed 
ByteBuffer is direct (read-only or not), instead of performing a computation 
byte-per-byte.

 

I'm aware that currently the client's Buffer pools aren't using direct 
ByteBuffer, but having full support for it can open the door to future 
interesting optimizations on it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872646217


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##
@@ -248,80 +182,43 @@ public void process(final Record record) {
 }
 }
 
-tryEmitFinalResult(record, windowCloseTime);
+maybeMeasureEmitFinalLatency(record, windowCloseTime);
 }
 
-private void tryEmitFinalResult(final Record record, final 
long windowCloseTime) {
-if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
-return;
-}
-
-final long now = internalProcessorContext.currentSystemTimeMs();
-// Throttle emit frequency as an optimization, the tradeoff is 
that we need to remember the
-// window close time when we emitted last time so that we can 
restart from there in the next emit
-if (now < timeTracker.nextTimeToEmit) {
-return;
-}
-
-// Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
-// this can be triggered every time
-timeTracker.nextTimeToEmit = now;
-timeTracker.advanceNextTimeToEmit();
-
-// Window close time has not progressed, there will be no windows 
to close hence no records to emit
-if (lastEmitWindowCloseTime != ConsumerRecord.NO_TIMESTAMP && 
lastEmitWindowCloseTime >= windowCloseTime) {
+@Override
+protected void maybeForwardFinalResult(final Record record, 
final long windowCloseTime) {
+if (!shouldEmitFinal(windowCloseTime)) {
 return;
 }
 
 final long emitRangeUpperBoundInclusive = windowCloseTime - 
windows.size();
-// No window has ever closed and hence no need to emit any records
 if (emitRangeUpperBoundInclusive < 0) {
+// If emitRangeUpperBoundInclusive is 0, it means first window 
closes since windowEndTime
+// is exclusive
 return;
 }
 
-
-// Set emitRangeLowerBoundInclusive to -1L if 
lastEmitWindowCloseTime was not set so that
-// we would fetch from 0L for the first time; otherwise set it to 
lastEmitWindowCloseTime - windows.size().
-//
-// Note if we get here, it means emitRangeUpperBoundInclusive > 0, 
which means windowCloseTime > windows.size(),
-// Because we always set lastEmitWindowCloseTime to 
windowCloseTime before, it means
-// lastEmitWindowCloseTime - windows.size() should always > 0
-// As a result, emitRangeLowerBoundInclusive is always >= 0
+// Because we only get here when emitRangeUpperBoundInclusive > 0 
which means closeTime > windows.size()
+// Since we set lastEmitCloseTime to closeTime before storing to 
processor metadata
+// lastEmitCloseTime - windows.size() is always > 0
+// Set emitRangeLowerBoundInclusive to -1L if not set so that when 
we fetchAll, we fetch from 0L
 final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime 
== ConsumerRecord.NO_TIMESTAMP ?
 -1L : lastEmitWindowCloseTime - windows.size();

Review Comment:
   I've made some refactoring, LMK what do you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12162: KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs

2022-05-13 Thread GitBox


hachikuji opened a new pull request, #12162:
URL: https://github.com/apache/kafka/pull/12162

   In the AlterConfigs/IncrementalAlterConfigs zk handler, we return 
`INVALID_REQUEST` and `INVALID_CONFIG` inconsistently. The problem is in 
`LogConfig.validate`. We may either return `ConfigException` or 
`InvalidConfigException`. When the first of these is thrown, we catch it and 
convert to INVALID_REQUEST. If the latter is thrown, then we return 
`INVALID_CONFIG`. It seems more appropriate to return INVALID_CONFIG 
consistently. Note that the KRaft implementation already does this.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872636671


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##
@@ -467,13 +459,35 @@ private boolean rightWindowIsNotEmpty(final 
ValueAndTimestamp rightWinAgg,
 return rightWinAgg != null && rightWinAgg.timestamp() > 
inputRecordTimestamp;
 }
 
+@Override
+protected void maybeForwardFinalResult(final Record record, 
final long windowCloseTime) {
+if (!shouldEmitFinal(windowCloseTime)) {
+return;
+}
+
+final long emitRangeUpperBoundExclusive = windowCloseTime - 
windows.timeDifferenceMs();
+
+if (emitRangeUpperBoundExclusive <= 0) {
+// Sliding window's start and end timestamps are inclusive, so
+// the window is not closed if emitRangeUpperBoundExclusive is 
0,
+// and we shouldn't emit
+return;
+}
+
+final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime 
== ConsumerRecord.NO_TIMESTAMP ?
+0L : lastEmitWindowCloseTime - windows.timeDifferenceMs();

Review Comment:
   So far it seems `lastEmitWindowCloseTime` should always be no smaller than 
window size in either sliding or time windows, but when there's bugs it's 
possible that the read value from the processor metadata is small. I will 
update it accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y opened a new pull request, #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-05-13 Thread GitBox


jnh5y opened a new pull request, #12161:
URL: https://github.com/apache/kafka/pull/12161

   This PR adds the ability to pause and resume KafkaStreams instances as well 
as named/modular topologies.
   
   Added an integration test to show how pausing and resuming works.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] andymg3 opened a new pull request, #12160: KAFKA-13889: Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL

2022-05-13 Thread GitBox


andymg3 opened a new pull request, #12160:
URL: https://github.com/apache/kafka/pull/12160

   ### JIRA
   https://issues.apache.org/jira/browse/KAFKA-13889
   
   ### Description
   - Fixes `AclsDelta` to handle `ACCESS_CONTROL_ENTRY_RECORD` quickly followed 
by `REMOVE_ACCESS_CONTROL_ENTRY_RECORD` for same ACL
   - As explained in the JIRA, in 
https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java#L64
 we store the pending deletion in the `changes` Map. This could override a 
creation that might have just happened. This is an issue because in 
`BrokerMetadataPublisher` this results in us making a `removeAcl` call which 
finally results in 
https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L203
 being executed and this code throws an exception if the ACL isnt in the Map 
yet. If the `ACCESS_CONTROL_ENTRY_RECORD` event never got processed by 
`BrokerMetadataPublisher` then the ACL wont be in the Map yet.
   - So the fix here is to remove the entry from the `changes` Map if the ACL 
doesnt exist in the image yet. 
   
   ### Testing
   - Added unit tests for new behavior
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (KAFKA-13871) The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the RaftConfig class is incorrect

2022-05-13 Thread yingquan he (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yingquan he closed KAFKA-13871.
---

> The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the 
> RaftConfig class is incorrect
> --
>
> Key: KAFKA-13871
> URL: https://issues.apache.org/jira/browse/KAFKA-13871
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: yingquan he
>Assignee: lqjacklee
>Priority: Minor
>
> The syntax of the field QUORUM_FETCH_TIMEOUT_MS_DOC is incorrect. `a 
> election`  should be changed to `an election`.
>  
> {code:java}
> public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time 
> without a successful fetch from " +
> "the current leader before becoming a candidate and triggering a election 
> for voters; Maximum time without " +
> "receiving fetch from a majority of the quorum before asking around to 
> see if there's a new epoch for leader"; {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13889) Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL

2022-05-13 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-13889:
-
Summary: Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly 
followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL  (was: Update 
AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by 
REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL)

> Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by 
> REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL
> ---
>
> Key: KAFKA-13889
> URL: https://issues.apache.org/jira/browse/KAFKA-13889
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Priority: Major
>
> In 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java#L64]
>  we store the pending deletion in the changes map. This could override a 
> creation that might have just happened. This is an issue because in 
> BrokerMetadataPublisher this results in us making a removeAcl call which 
> finally results in 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L203]
>  being executed and this code throws an exception if the ACL isnt in the Map 
> yet. If the ACCESS_CONTROL_ENTRY_RECORD event never got processed by 
> BrokerMetadataPublisher then the ACL wont be in the Map yet.
> My feeling is we might want to make removeAcl idempotent in that it returns 
> success if the ACL doesn't exist: no matter how many times removeAcl is 
> called it returns success if the ACL is deleted. Maybe we’d just log a 
> warning or something?
> Note, I dont think the AclControlManager has this issue because it doesn't 
> batch the events like AclsDelta does. However, we still do throw a 
> RuntimeException here 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L197]
>  - maybe we should still follow the same logic (if we make the fix suggested 
> above) and just log a warning if the ACL doesnt exist in the Map?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872607460


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor droppedRecordsSensor;
+protected Sensor emittedRecordsSensor;
+protected Sensor emitFinalLatencySensor;
+protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+protected InternalProcessorContext, Change> 
internalProcessorContext;
+
+protected AbstractKStreamTimeWindowAggregateProcessor(final String 
storeName,
+  final EmitStrategy 
emitStrategy,
+  final boolean 
sendOldValues) {
+this.storeName = storeName;
+this.emitStrategy = emitStrategy;
+this.sendOldValues = sendOldValues;
+}
+
+@Override
+public void init(final ProcessorContext, Change> 
context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext, 
Change>) context;
+final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+windowStore = context.getStateStore(storeName);
+
+if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+// Restore last emit close time for ON_WINDOW_CLOSE strategy
+final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+  

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872604769


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor droppedRecordsSensor;
+protected Sensor emittedRecordsSensor;
+protected Sensor emitFinalLatencySensor;
+protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+protected InternalProcessorContext, Change> 
internalProcessorContext;
+
+protected AbstractKStreamTimeWindowAggregateProcessor(final String 
storeName,
+  final EmitStrategy 
emitStrategy,
+  final boolean 
sendOldValues) {
+this.storeName = storeName;
+this.emitStrategy = emitStrategy;
+this.sendOldValues = sendOldValues;
+}
+
+@Override
+public void init(final ProcessorContext, Change> 
context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext, 
Change>) context;
+final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+windowStore = context.getStateStore(storeName);
+
+if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+// Restore last emit close time for ON_WINDOW_CLOSE strategy
+final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+  

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872603611


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor droppedRecordsSensor;
+protected Sensor emittedRecordsSensor;
+protected Sensor emitFinalLatencySensor;
+protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+protected InternalProcessorContext, Change> 
internalProcessorContext;
+
+protected AbstractKStreamTimeWindowAggregateProcessor(final String 
storeName,
+  final EmitStrategy 
emitStrategy,
+  final boolean 
sendOldValues) {
+this.storeName = storeName;
+this.emitStrategy = emitStrategy;
+this.sendOldValues = sendOldValues;
+}
+
+@Override
+public void init(final ProcessorContext, Change> 
context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext, 
Change>) context;
+final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+windowStore = context.getStateStore(storeName);
+
+if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+// Restore last emit close time for ON_WINDOW_CLOSE strategy
+final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+  

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872602138


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor droppedRecordsSensor;
+protected Sensor emittedRecordsSensor;
+protected Sensor emitFinalLatencySensor;
+protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+protected InternalProcessorContext, Change> 
internalProcessorContext;
+
+protected AbstractKStreamTimeWindowAggregateProcessor(final String 
storeName,
+  final EmitStrategy 
emitStrategy,
+  final boolean 
sendOldValues) {
+this.storeName = storeName;
+this.emitStrategy = emitStrategy;
+this.sendOldValues = sendOldValues;
+}
+
+@Override
+public void init(final ProcessorContext, Change> 
context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext, 
Change>) context;
+final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+windowStore = context.getStateStore(storeName);
+
+if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+// Restore last emit close time for ON_WINDOW_CLOSE strategy
+final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+  

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872601055


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor droppedRecordsSensor;
+protected Sensor emittedRecordsSensor;
+protected Sensor emitFinalLatencySensor;
+protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+protected InternalProcessorContext, Change> 
internalProcessorContext;
+
+protected AbstractKStreamTimeWindowAggregateProcessor(final String 
storeName,
+  final EmitStrategy 
emitStrategy,
+  final boolean 
sendOldValues) {
+this.storeName = storeName;
+this.emitStrategy = emitStrategy;
+this.sendOldValues = sendOldValues;
+}
+
+@Override
+public void init(final ProcessorContext, Change> 
context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext, 
Change>) context;
+final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+windowStore = context.getStateStore(storeName);
+
+if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+// Restore last emit close time for ON_WINDOW_CLOSE strategy
+final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+  

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872600795


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor droppedRecordsSensor;
+protected Sensor emittedRecordsSensor;
+protected Sensor emitFinalLatencySensor;
+protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+protected InternalProcessorContext, Change> 
internalProcessorContext;
+
+protected AbstractKStreamTimeWindowAggregateProcessor(final String 
storeName,
+  final EmitStrategy 
emitStrategy,
+  final boolean 
sendOldValues) {
+this.storeName = storeName;
+this.emitStrategy = emitStrategy;
+this.sendOldValues = sendOldValues;
+}
+
+@Override
+public void init(final ProcessorContext, Change> 
context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext, 
Change>) context;
+final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+windowStore = context.getStateStore(storeName);
+
+if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+// Restore last emit close time for ON_WINDOW_CLOSE strategy
+final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+  

[jira] [Updated] (KAFKA-13889) Update AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL

2022-05-13 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-13889:
-
Summary: Update AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly 
followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL  (was: Broker can't 
handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by 
REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL)

> Update AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by 
> REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL
> --
>
> Key: KAFKA-13889
> URL: https://issues.apache.org/jira/browse/KAFKA-13889
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Priority: Major
>
> In 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java#L64]
>  we store the pending deletion in the changes map. This could override a 
> creation that might have just happened. This is an issue because in 
> BrokerMetadataPublisher this results in us making a removeAcl call which 
> finally results in 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L203]
>  being executed and this code throws an exception if the ACL isnt in the Map 
> yet. If the ACCESS_CONTROL_ENTRY_RECORD event never got processed by 
> BrokerMetadataPublisher then the ACL wont be in the Map yet.
> My feeling is we might want to make removeAcl idempotent in that it returns 
> success if the ACL doesn't exist: no matter how many times removeAcl is 
> called it returns success if the ACL is deleted. Maybe we’d just log a 
> warning or something?
> Note, I dont think the AclControlManager has this issue because it doesn't 
> batch the events like AclsDelta does. However, we still do throw a 
> RuntimeException here 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L197]
>  - maybe we should still follow the same logic (if we make the fix suggested 
> above) and just log a warning if the ACL doesnt exist in the Map?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872596894


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor droppedRecordsSensor;
+protected Sensor emittedRecordsSensor;
+protected Sensor emitFinalLatencySensor;
+protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+protected InternalProcessorContext, Change> 
internalProcessorContext;
+
+protected AbstractKStreamTimeWindowAggregateProcessor(final String 
storeName,
+  final EmitStrategy 
emitStrategy,
+  final boolean 
sendOldValues) {
+this.storeName = storeName;
+this.emitStrategy = emitStrategy;
+this.sendOldValues = sendOldValues;
+}
+
+@Override
+public void init(final ProcessorContext, Change> 
context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext, 
Change>) context;
+final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
+internalProcessorContext.currentNode().name(), metrics);
+windowStore = context.getStateStore(storeName);
+
+if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+// Restore last emit close time for ON_WINDOW_CLOSE strategy
+final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+  

[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-13 Thread GitBox


mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r872596541


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -53,7 +55,7 @@ public class ClusterConfig {
 
 ClusterConfig(Type type, int brokers, int controllers, String name, 
boolean autoStart,
   SecurityProtocol securityProtocol, String listenerName, File 
trustStoreFile,
-  String ibp) {
+  String ibp, MetadataVersion metadataVersion) {

Review Comment:
   Hm, actually I think I'll remove the ibp part from the annotation and just 
use MetadataVersion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872593623


##
streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java:
##
@@ -19,15 +19,52 @@
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
 import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * This interface controls the strategy that can be used to control how we 
emit results in a processor.
  */
 public interface EmitStrategy {
 
+Logger log = LoggerFactory.getLogger(EmitStrategy.class);
+
 enum StrategyType {
-ON_WINDOW_CLOSE,
-ON_WINDOW_UPDATE
+ON_WINDOW_UPDATE(0, new WindowUpdateStrategy()),
+ON_WINDOW_CLOSE(1, new WindowCloseStrategy());
+
+private final short code;
+private final EmitStrategy strategy;
+
+private short code() {
+return this.code;
+}
+
+private EmitStrategy strategy() {
+return this.strategy;
+}
+
+StrategyType(final int code, final EmitStrategy strategy) {
+this.code = (short) code;
+this.strategy = strategy;
+}
+
+private final static Map TYPE_TO_STRATEGY = new 
HashMap<>();
+
+static {
+for (final StrategyType type : StrategyType.values()) {
+if (TYPE_TO_STRATEGY.put(type.code(), type.strategy()) != null)
+throw new IllegalStateException("Code " + type.code() + " 
for type " +

Review Comment:
   I was following the enum `Errors` as well to add this guard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-13 Thread GitBox


guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872593270


##
streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java:
##
@@ -19,15 +19,52 @@
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
 import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * This interface controls the strategy that can be used to control how we 
emit results in a processor.
  */
 public interface EmitStrategy {
 
+Logger log = LoggerFactory.getLogger(EmitStrategy.class);
+
 enum StrategyType {
-ON_WINDOW_CLOSE,
-ON_WINDOW_UPDATE
+ON_WINDOW_UPDATE(0, new WindowUpdateStrategy()),
+ON_WINDOW_CLOSE(1, new WindowCloseStrategy());
+
+private final short code;
+private final EmitStrategy strategy;
+
+private short code() {
+return this.code;
+}
+
+private EmitStrategy strategy() {
+return this.strategy;
+}
+
+StrategyType(final int code, final EmitStrategy strategy) {

Review Comment:
   This is because Java's default type is int -- i.e. line 36/37 above would 
take the value 0/1 as int. So we basically need to either do the conversion in 
each line of 36/37 above, or just do the conversion once here.
   
   I followed our other enums like `Errors` to do the conversion here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13899) Inconsistent error codes returned from AlterConfig APIs

2022-05-13 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13899:
---

 Summary: Inconsistent error codes returned from AlterConfig APIs
 Key: KAFKA-13899
 URL: https://issues.apache.org/jira/browse/KAFKA-13899
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


In the AlterConfigs/IncrementalAlterConfigs zk handler, we return 
INVALID_REQUEST and INVALID_CONFIG inconsistently. The problem is in 
`LogConfig.validate`. We may either return `ConfigException` or 
`InvalidConfigException`. When the first of these is thrown, we catch it and 
convert to INVALID_REQUEST. It seems more consistent to convert to 
INVALID_CONFIG.

Note that the kraft implementation returns INVALID_CONFIG consistently.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536747#comment-17536747
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872567579


##
.htaccess:
##
@@ -9,3 +9,5 @@ RewriteRule ^/?(\d+)/javadoc - [S=2]
 RewriteRule ^/?(\d+)/images/ - [S=1]
 RewriteCond $2 !=protocol
 RewriteRule ^/?(\d+)/([a-z]+)(\.html)? /$1/documentation#$2 [R=302,L,NE]
+RewriteCond %{REQUEST_FILENAME}.html -f
+RewriteRule ^(.*)$ %{REQUEST_FILENAME}.html

Review Comment:
   Add a rewrite rule, it will serve `.html` if the HTML file 
exists.
   
   I do have a question, is this file being used in Production?
   
   If its not, then this change should be fine, if it is, then maybe we need 
someone that is familiar with it to review, I never operate httpd so I might be 
missing something





> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536746#comment-17536746
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872567579


##
.htaccess:
##
@@ -9,3 +9,5 @@ RewriteRule ^/?(\d+)/javadoc - [S=2]
 RewriteRule ^/?(\d+)/images/ - [S=1]
 RewriteCond $2 !=protocol
 RewriteRule ^/?(\d+)/([a-z]+)(\.html)? /$1/documentation#$2 [R=302,L,NE]
+RewriteCond %{REQUEST_FILENAME}.html -f
+RewriteRule ^(.*)$ %{REQUEST_FILENAME}.html

Review Comment:
   Add a rewrite rule, it will serve `.html` if the HTML file 
exists.
   
   I do have a question, is this file being used in Production?





> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536742#comment-17536742
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on PR #410:
URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1126221013

   Hi @mimaison sorry for the back and forth, after some fiddling, I realize 
the problem is that when requesting `localhost:8080/path`, httpd tries to find 
a file named`path`, which does not exists, and what we really want is to 
resolve `path` to `path.html` in project root.
   
   I ended solving this by adding a rewrite rule in `.htaccess` file, but I 
don't know how is it solved in the real website deployment, is `.htaccess` 
actually used in production?




> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-5074) Transition to OnlinePartition without preferred leader in ISR fails

2022-05-13 Thread Paul Dubuc (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536721#comment-17536721
 ] 

Paul Dubuc commented on KAFKA-5074:
---

I see that this ticket is still open and unresolved.  Is there a particular 
version of Kafka where this problem has been fixed?  Or is there a workaround 
method for taking a node down that avoids this problem?  Thanks.

> Transition to OnlinePartition without preferred leader in ISR fails
> ---
>
> Key: KAFKA-5074
> URL: https://issues.apache.org/jira/browse/KAFKA-5074
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> Running 0.9.0.0, the controller can get into a state where it no longer is 
> able to elect a leader for an Offline partition. It's unclear how this state 
> is first achieved but in the steady state, this happens:
> -There are partitions with a leader of -1
> -The Controller repeatedly attempts a preferred leader election for these 
> partitions
> -The preferred leader election fails because the only replica in the ISR is 
> not the preferred leader
> The log cycle looks like this:
> {code}
> [2017-04-12 18:00:18,891] INFO [Controller 8]: Starting preferred replica 
> leader election for partitions topic,1
> [2017-04-12 18:00:18,891] INFO [Partition state machine on Controller 8]: 
> Invoking state change to OnlinePartition for partitions topic,1
> [2017-04-12 18:00:18,892] INFO [PreferredReplicaPartitionLeaderSelector]: 
> Current leader -1 for partition [topic,1] is not the preferred replica. 
> Trigerring preferred replica leader election 
> (kafka.controller.PreferredReplicaPartitionLeaderSelector)
> [2017-04-12 18:00:18,893] WARN [Controller 8]: Partition [topic,1] failed to 
> complete preferred replica leader election. Leader is -1 
> (kafka.controller.KafkaController)
> {code}
> It's not clear if this would happen on versions later that 0.9.0.0.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536718#comment-17536718
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on PR #410:
URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1126166013

   Woops, might be a regression, sorry, let me check, I will move this into 
draft in the mean time
   
   I agree, it is better to make it work for the whole site




> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] fvaleri opened a new pull request, #12159: Fix stuck SSL tests in case of authentication failure

2022-05-13 Thread GitBox


fvaleri opened a new pull request, #12159:
URL: https://github.com/apache/kafka/pull/12159

   When there is an authentication error after the initial TCP connection,
   the selector never becomes READY, and these tests wait forever waiting for 
this state.
   
   This is actually what happened to me while using an OpenJDK build that does 
not
   support the required cipher suites.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13898) metrics.recording.level is underdocumented

2022-05-13 Thread Tom Bentley (Jira)
Tom Bentley created KAFKA-13898:
---

 Summary: metrics.recording.level is underdocumented
 Key: KAFKA-13898
 URL: https://issues.apache.org/jira/browse/KAFKA-13898
 Project: Kafka
  Issue Type: Improvement
  Components: docs
Reporter: Tom Bentley


metrics.recording.level is only briefly described in the documentation. In 
particular the recording level associated with each metric is not documented, 
which makes it difficult to know the effect of changing the level.





--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-13 Thread GitBox


mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r872493845


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -927,6 +984,29 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 }
 }
 
+/**
+ * A callback for changes to feature levels including metadata.version. 
This is called synchronously from
+ * {@link FeatureControlManager#replay(FeatureLevelRecord)} which is part 
of a ControllerWriteEvent. It is safe
+ * to modify controller state here. By the time this listener is called, a 
FeatureLevelRecord has been committed and
+ * the in-memory state of FeatureControlManager has been updated.
+ */
+class QuorumFeatureListener implements FeatureLevelListener {
+@Override
+public void handle(String featureName, short finalizedVersion) {

Review Comment:
   If we need to react to a feature level change in one of the control 
managers, it will probably be easier to do it via QuorumController rather than 
adding dependencies to FeatureControlManager. But, we don't have any use cases 
yet so I guess we don't really need the listener thing yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #12122: WIP: Upgrade tests for KAFKA-13769

2022-05-13 Thread GitBox


Ge commented on PR #12122:
URL: https://github.com/apache/kafka/pull/12122#issuecomment-1126143043

   Green CI run with system tests - 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4897/.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536685#comment-17536685
 ] 

ASF GitHub Bot commented on KAFKA-13882:


mimaison commented on PR #410:
URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1126118797

   When I run this locally, I can only access pages under `Docs`, for anything 
else I get 404.
   
   Is it expected? I would be nice to preview the complete website.




> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] joel-hamill commented on a diff in pull request #12153: MINOR: Clarify impact of num.replica.fetchers

2022-05-13 Thread GitBox


joel-hamill commented on code in PR #12153:
URL: https://github.com/apache/kafka/pull/12153#discussion_r872430394


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -894,7 +894,7 @@ object KafkaConfig {
 "record batch size accepted by the broker is defined via 
message.max.bytes (broker config) or " +
 "max.message.bytes (topic config)."
   val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate 
messages from a source broker. " +

Review Comment:
   i have updated the PR 
https://github.com/apache/kafka/pull/12153/commits/0957f0881c7638c533ff2f0ffbc083f58448bc79



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] joel-hamill commented on a diff in pull request #12153: MINOR: Clarify impact of num.replica.fetchers

2022-05-13 Thread GitBox


joel-hamill commented on code in PR #12153:
URL: https://github.com/apache/kafka/pull/12153#discussion_r872429806


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -894,7 +894,7 @@ object KafkaConfig {
 "record batch size accepted by the broker is defined via 
message.max.bytes (broker config) or " +
 "max.message.bytes (topic config)."
   val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate 
messages from a source broker. " +
-  "Increasing this value can increase the degree of I/O parallelism in the 
follower broker."
+  "Increasing this value can increase the degree of I/O parallelism in the 
follower and leader broker at the cost of higher CPU utilization."

Review Comment:
   okay i have updated based on the feedback below from @hachikuji 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-13 Thread GitBox


mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r872423670


##
metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.controller.util.SnapshotFileReader;
+import org.apache.kafka.controller.util.SnapshotFileWriter;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * A read-only class that holds the controller bootstrap metadata. A file 
named "bootstrap.snapshot" is used and the
+ * format is the same as a KRaft snapshot.
+ */
+public class BootstrapMetadata {
+private static final Logger log = 
LoggerFactory.getLogger(BootstrapMetadata.class);
+
+public static final String BOOTSTRAP_FILE = "bootstrap.snapshot";
+
+private final MetadataVersion metadataVersion;
+
+private final List records;
+
+BootstrapMetadata(MetadataVersion metadataVersion, 
List records) {
+this.metadataVersion = metadataVersion;
+this.records = Collections.unmodifiableList(records);
+}
+
+public MetadataVersion metadataVersion() {
+return this.metadataVersion;
+}
+
+public List records() {
+return records;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+BootstrapMetadata metadata = (BootstrapMetadata) o;
+return metadataVersion == metadata.metadataVersion;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(metadataVersion);
+}
+
+@Override
+public String toString() {
+return "BootstrapMetadata{" +
+"metadataVersion=" + metadataVersion +
+'}';
+}
+
+/**
+ * A raft client listener that simply collects all of the commits and 
snapshots into a mapping of
+ * metadata record type to list of records.
+ */
+private static class BootstrapListener implements 
RaftClient.Listener {
+private final List records = new ArrayList<>();
+
+@Override
+public void handleCommit(BatchReader reader) {
+try {
+while (reader.hasNext()) {
+Batch batch = reader.next();
+records.addAll(batch.records());
+}
+} finally {
+reader.close();
+}
+}
+
+@Override
+public void handleSnapshot(SnapshotReader 
reader) {
+try {
+while (reader.hasNext()) {
+Batch batch = reader.next();
+for (ApiMessageAndVersion messageAndVersion : batch) {
+records.add(messageAndVersion);
+}
+}
+} finally {
+reader.close();
+}
+}
+}
+
+public static BootstrapMetadata create(MetadataVersion metadataVersion) {
+return create(metadataVersion, new ArrayList<>());
+}
+
+public static BootstrapMetadata create(MetadataVersion metadataVersion, 
List records) {
+if (!metadataVersion.isKRaftSupported()) {
+throw new IllegalArgumentException("Cannot create 
BootstrapMetadata with a non-KRaft metadata version.");
+}
+records.add(new ApiMessageAndVersion(
+new FeatureLevelRecord()
+

[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-13 Thread GitBox


mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r872417278


##
metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.controller.util.SnapshotFileReader;
+import org.apache.kafka.controller.util.SnapshotFileWriter;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * A read-only class that holds the controller bootstrap metadata. A file 
named "bootstrap.snapshot" is used and the
+ * format is the same as a KRaft snapshot.
+ */
+public class BootstrapMetadata {
+private static final Logger log = 
LoggerFactory.getLogger(BootstrapMetadata.class);
+
+public static final String BOOTSTRAP_FILE = "bootstrap.snapshot";
+
+private final MetadataVersion metadataVersion;
+
+private final List records;
+
+BootstrapMetadata(MetadataVersion metadataVersion, 
List records) {
+this.metadataVersion = metadataVersion;
+this.records = Collections.unmodifiableList(records);
+}
+
+public MetadataVersion metadataVersion() {
+return this.metadataVersion;
+}
+
+public List records() {
+return records;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+BootstrapMetadata metadata = (BootstrapMetadata) o;
+return metadataVersion == metadata.metadataVersion;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(metadataVersion);
+}
+
+@Override
+public String toString() {
+return "BootstrapMetadata{" +
+"metadataVersion=" + metadataVersion +
+'}';
+}
+
+/**
+ * A raft client listener that simply collects all of the commits and 
snapshots into a mapping of
+ * metadata record type to list of records.
+ */
+private static class BootstrapListener implements 
RaftClient.Listener {
+private final List records = new ArrayList<>();
+
+@Override
+public void handleCommit(BatchReader reader) {
+try {
+while (reader.hasNext()) {
+Batch batch = reader.next();
+records.addAll(batch.records());
+}
+} finally {
+reader.close();
+}
+}
+
+@Override
+public void handleSnapshot(SnapshotReader 
reader) {
+try {
+while (reader.hasNext()) {
+Batch batch = reader.next();
+for (ApiMessageAndVersion messageAndVersion : batch) {
+records.add(messageAndVersion);
+}
+}
+} finally {
+reader.close();
+}
+}
+}
+
+public static BootstrapMetadata create(MetadataVersion metadataVersion) {
+return create(metadataVersion, new ArrayList<>());
+}
+
+public static BootstrapMetadata create(MetadataVersion metadataVersion, 
List records) {
+if (!metadataVersion.isKRaftSupported()) {
+throw new IllegalArgumentException("Cannot create 
BootstrapMetadata with a non-KRaft metadata version.");
+}
+records.add(new ApiMessageAndVersion(
+new FeatureLevelRecord()
+

[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536616#comment-17536616
 ] 

ASF GitHub Bot commented on KAFKA-13882:


mimaison commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872351993


##
start-preview.sh:
##
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+set -euxo pipefail
+
+docker build -t kafka-site-preview .
+
+docker run -dit --rm --name mypreview -p 8080:80 -v 
"$PWD":/usr/local/apache2/htdocs/ kafka-site-preview
+
+echo "You can stop the preview server by running `docker stop mypreview`"

Review Comment:
   If you know a better way to achieve the same, yes do it





> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536583#comment-17536583
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872289325


##
start-preview.sh:
##
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+set -euxo pipefail
+
+docker build -t kafka-site-preview .
+
+docker run -dit --rm --name mypreview -p 8080:80 -v 
"$PWD":/usr/local/apache2/htdocs/ kafka-site-preview
+
+echo "You can stop the preview server by running `docker stop mypreview`"

Review Comment:
   ~Also, can you explain what `read -r -d '' _  Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536584#comment-17536584
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872289325


##
start-preview.sh:
##
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+set -euxo pipefail
+
+docker build -t kafka-site-preview .
+
+docker run -dit --rm --name mypreview -p 8080:80 -v 
"$PWD":/usr/local/apache2/htdocs/ kafka-site-preview
+
+echo "You can stop the preview server by running `docker stop mypreview`"

Review Comment:
   ~Also, can you explain what `read -r -d '' _  Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536574#comment-17536574
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872289325


##
start-preview.sh:
##
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+set -euxo pipefail
+
+docker build -t kafka-site-preview .
+
+docker run -dit --rm --name mypreview -p 8080:80 -v 
"$PWD":/usr/local/apache2/htdocs/ kafka-site-preview
+
+echo "You can stop the preview server by running `docker stop mypreview`"

Review Comment:
   Also, can you explain what `read -r -d '' _  Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536573#comment-17536573
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872286879


##
start-preview.sh:
##
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+set -euxo pipefail
+
+docker build -t kafka-site-preview .
+
+docker run -dit --rm --name mypreview -p 8080:80 -v 
"$PWD":/usr/local/apache2/htdocs/ kafka-site-preview
+
+echo "You can stop the preview server by running `docker stop mypreview`"

Review Comment:
   Hi, thanks for the suggestion, I am not a shell expert so I might be missing 
something here, I think we achieve similar effect by not running the container 
in detached mode.
   
   Then ctrl + c will work, do you think that's acceptable?





> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13897) Add 3.1.1 to system tests and streams upgrade tests

2022-05-13 Thread Tom Bentley (Jira)
Tom Bentley created KAFKA-13897:
---

 Summary: Add 3.1.1 to system tests and streams upgrade tests
 Key: KAFKA-13897
 URL: https://issues.apache.org/jira/browse/KAFKA-13897
 Project: Kafka
  Issue Type: Task
  Components: streams, system tests
Reporter: Tom Bentley
 Fix For: 3.3.0, 3.1.2, 3.2.1


Per the penultimate bullet on the [release 
checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses],
 Kafka v3.1.1 is released. We should add this version to the system tests.





--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536544#comment-17536544
 ] 

ASF GitHub Bot commented on KAFKA-13882:


mimaison commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872225024


##
start-preview.sh:
##
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+set -euxo pipefail
+
+docker build -t kafka-site-preview .
+
+docker run -dit --rm --name mypreview -p 8080:80 -v 
"$PWD":/usr/local/apache2/htdocs/ kafka-site-preview
+
+echo "You can stop the preview server by running `docker stop mypreview`"

Review Comment:
   What about doing something like this to make it even easier to use?
   
   ```suggestion
   #!/usr/bin/env bash
   
   set -euo pipefail
   
   clean() {
 docker stop site-preview
 exit 0
   }
   
   trap clean SIGINT
   
   docker build -t kafka-site-preview .
   
   docker run -dit --rm --name site-preview -p 8080:80 -v 
"$PWD":/usr/local/apache2/htdocs/ kafka-site-preview
   
   echo "The preview is available on http://localhost:8080.;
   echo "Press CTRL+C to stop it."
   
   read -r -d '' _  Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] bozhao12 closed pull request #12022: MINOR:Remove repeated variable reset in KafkaController.scala

2022-05-13 Thread GitBox


bozhao12 closed pull request #12022: MINOR:Remove repeated variable reset in 
KafkaController.scala
URL: https://github.com/apache/kafka/pull/12022


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bozhao12 opened a new pull request, #12158: MINOR:A few code cleanUps in KafkaController

2022-05-13 Thread GitBox


bozhao12 opened a new pull request, #12158:
URL: https://github.com/apache/kafka/pull/12158

   Following variables in kafkaController are used for metric statistics:
   ```
  offlinePartitionCount 
   preferredReplicaImbalanceCount
   globalTopicCount 
   globalPartitionCount
   topicsToDeleteCount 
   replicasToDeleteCount 
   ineligibleTopicsToDeleteCount 
   ineligibleReplicasToDeleteCount 
   ```
   When Controller goes from active to non-active, these variables will be 
reset to 0.
   Currently, we will perform reset operations in 
`KafkaController.onControllerResignation() `and 
`KafkaController.updateMetrics()` .
   in fact, whether it is an active controller or a non-active controller, as 
long as it  receives events related to controller change, The method` 
KafkaController.updateMetrics()` will be executed, and decide whether to reset 
the above variables. So the reset operations in 
`KafkaController.onControllerResignation() ` can actually be removed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-12549) Allow state stores to opt-in transactional support

2022-05-13 Thread Alex Sorokoumov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Sorokoumov reassigned KAFKA-12549:
---

Assignee: Alex Sorokoumov

> Allow state stores to opt-in transactional support
> --
>
> Key: KAFKA-12549
> URL: https://issues.apache.org/jira/browse/KAFKA-12549
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Alex Sorokoumov
>Priority: Major
>
> Right now Kafka Stream's EOS implementation does not make any assumptions 
> about the state store's transactional support. Allowing the state stores to 
> optionally provide transactional support can have multiple benefits. E.g., if 
> we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
> {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are 
> supported via an additional {{boolean transactional()}} API, and if yes the 
> these APIs can be used under both ALOS and EOS like the following (otherwise 
> then just fallback to the normal processing logic):
> Within thread processing loops:
> 1. store.beginTxn
> 2. store.put // during processing
> 3. streams commit // either through eos protocol or not
> 4. store.commitTxn
> 5. start the next txn by store.beginTxn
> If the state stores allow Streams to do something like above, we can have the 
> following benefits:
> * Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
> still, but some middle-ground where uncommitted data within a state store 
> would not be retained if store.commitTxn failed).
> * No need to wipe the state store and re-bootstrap from scratch upon crashes 
> for EOS. E.g., if a crash-failure happened between streams commit completes 
> and store.commitTxn. We can instead just roll-forward the transaction by 
> replaying the changelog from the second recent streams committed offset 
> towards the most recent committed offset.
> * Remote stores that support txn then do not need to support wiping 
> (https://issues.apache.org/jira/browse/KAFKA-12475).
> * We can fix the known issues of emit-on-change 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
> * We can support "query committed data only" for interactive queries (see 
> below for reasons).
> As for the implementation of these APIs, there are several options:
> * The state store itself have natural transaction features (e.g. RocksDB).
> * Use an in-memory buffer for all puts within a transaction, and upon 
> `commitTxn` write the whole buffer as a batch to the underlying state store, 
> or just drop the whole buffer upon aborting. Then for interactive queries, 
> one can optionally only query the underlying store for committed data only.
> * Use a separate store as the transient persistent buffer. Upon `beginTxn` 
> create a new empty transient store, and upon `commitTxn` merge the store into 
> the underlying store. Same applies for interactive querying committed-only 
> data. This has a benefit compared with the one above that there's no memory 
> pressure even with long transactions, but incurs more complexity / 
> performance overhead with the separate persistent store.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-9837) New RPC for notifying controller of failed replica

2022-05-13 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536522#comment-17536522
 ] 

dengziming commented on KAFKA-9837:
---

[~soarez] Thank you for your proposals, I think this is what we are thinking 
about,  let the controller recording which directory hosts each replica, we can 
avoid sending an O(n) request. But I think the name 
ASSIGN_REPLICAS_TO_FAIL_GROUP and FAIL_GROUP is not clear, how about 
ASSIGN_REPLICAS_TO_DIRECTORY, FAIL_DIRECTORY.

btw, we have decided to mark KRaft as production soon in KIP-833, so this KIP 
will be useful and important.

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: New Feature
>  Components: controller, core
>Reporter: David Arthur
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12153: MINOR: Clarify impact of num.replica.fetchers

2022-05-13 Thread GitBox


divijvaidya commented on code in PR #12153:
URL: https://github.com/apache/kafka/pull/12153#discussion_r872165577


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -894,7 +894,7 @@ object KafkaConfig {
 "record batch size accepted by the broker is defined via 
message.max.bytes (broker config) or " +
 "max.message.bytes (topic config)."
   val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate 
messages from a source broker. " +
-  "Increasing this value can increase the degree of I/O parallelism in the 
follower broker."
+  "Increasing this value can increase the degree of I/O parallelism in the 
follower and leader broker at the cost of higher CPU utilization."

Review Comment:
   Sure. Can't argue against actual user anecdotes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12153: MINOR: Clarify impact of num.replica.fetchers

2022-05-13 Thread GitBox


divijvaidya commented on PR #12153:
URL: https://github.com/apache/kafka/pull/12153#issuecomment-1125823580

   @hachikuji one of the flaky failing tests in this PR is `Build / JDK 11 and 
Scala 2.13 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – 
kafka.network.ConnectionQuotasTest
   ` I have fixed that test at https://github.com/apache/kafka/pull/12045
   
   Please take a look when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536512#comment-17536512
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r872153458


##
README.md:
##
@@ -0,0 +1,11 @@
+# How to preview the documentation changes locally?
+
+The documentation can hosted on a local webserver via httpd.
+
+You can run it with the following command, note that it requires docker:
+
+```shell
+sh start-preview.sh

Review Comment:
   Got it, I've updated the file permission and the readme





> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] tombentley merged pull request #12156: MINOR: Update release versions for upgrade tests with 3.1.1 release

2022-05-13 Thread GitBox


tombentley merged PR #12156:
URL: https://github.com/apache/kafka/pull/12156


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming opened a new pull request, #12157: MINOR: Support co-resident mode in KRaft TestKit

2022-05-13 Thread GitBox


dengziming opened a new pull request, #12157:
URL: https://github.com/apache/kafka/pull/12157

   *More detailed description of your change*
   The behavior between co-resident mode and discrete mode is different, so we 
should support co-resident mode too. I also find a bug in co-resident, see 
KAFKA-13228.
   
   *Summary of testing strategy (including rationale)*
   Changed existing clusterTest to support co-resident mode except for 
ApiVersionsRequestTest, which is related to a bug.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >