[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
C0urante commented on PR #10528: URL: https://github.com/apache/kafka/pull/10528#issuecomment-1126634597 This will create other conflicts with https://github.com/apache/kafka/pull/11780. Would it be possible to prioritize that PR and then, once it's merged, fix the conflicts on this one and merge it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
franz1981 commented on code in PR #12163: URL: https://github.com/apache/kafka/pull/12163#discussion_r872925840 ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -40,10 +55,32 @@ public static void update(Checksum checksum, ByteBuffer buffer, int length) { public static void update(Checksum checksum, ByteBuffer buffer, int offset, int length) { if (buffer.hasArray()) { checksum.update(buffer.array(), buffer.position() + buffer.arrayOffset() + offset, length); -} else { -int start = buffer.position() + offset; -for (int i = start; i < start + length; i++) -checksum.update(buffer.get(i)); +return; +} +if (BYTE_BUFFER_UPDATE != null) { +final int oldPosition = buffer.position(); +final int oldLimit = buffer.limit(); +try { +// save a slice to be used to save an allocation in the hot-path +final int start = buffer.position() + offset; +buffer.position(0); +buffer.limit(start + length); +buffer.position(start); +BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer); +return; +} catch (Throwable t) { +// fallback Review Comment: The problem is the existing `update` signature: `MethodHandle` doesn't know about the `update` methods and it just assume that any method can raise a generic `Throwable`, even if it won't happen (as it's in this case). If I leave the `Throwable` to leak, given that's a checked exception, it's going to poison the existing code using it: I can wrap it in a generic `IllegalStateException` and do it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] franz1981 commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
franz1981 commented on code in PR #12163: URL: https://github.com/apache/kafka/pull/12163#discussion_r872924916 ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -40,10 +55,32 @@ public static void update(Checksum checksum, ByteBuffer buffer, int length) { public static void update(Checksum checksum, ByteBuffer buffer, int offset, int length) { if (buffer.hasArray()) { checksum.update(buffer.array(), buffer.position() + buffer.arrayOffset() + offset, length); -} else { -int start = buffer.position() + offset; -for (int i = start; i < start + length; i++) -checksum.update(buffer.get(i)); +return; +} +if (BYTE_BUFFER_UPDATE != null) { +final int oldPosition = buffer.position(); +final int oldLimit = buffer.limit(); +try { +// save a slice to be used to save an allocation in the hot-path +final int start = buffer.position() + offset; +buffer.position(0); +buffer.limit(start + length); +buffer.position(start); +BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer); +return; +} catch (Throwable t) { +// fallback +} finally { +// reset buffer's offsets +buffer.position(0); Review Comment: I've just read the `ByteBuffer` doc: I have moved first `position` to save `limit`'s change to throw any exception, but I see that it won't happen...let me remove this, good point :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
[ https://issues.apache.org/jira/browse/KAFKA-13745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13745. --- Resolution: Fixed > Flaky > kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone > - > > Key: KAFKA-13745 > URL: https://issues.apache.org/jira/browse/KAFKA-13745 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Blocker > Fix For: 3.3.0 > > > Example: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/ > {code} > org.opentest4j.AssertionFailedError: expected: but was: > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35) > at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227) > at > kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang merged pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang merged PR #12135: URL: https://github.com/apache/kafka/pull/12135 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
[ https://issues.apache.org/jira/browse/KAFKA-13745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536959#comment-17536959 ] Guozhang Wang commented on KAFKA-13745: --- Hi [~sagarrao] I'm happy to resolve the ticket now, and if we see it again we can re-create. > Flaky > kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone > - > > Key: KAFKA-13745 > URL: https://issues.apache.org/jira/browse/KAFKA-13745 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Blocker > Fix For: 3.3.0 > > > Example: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/ > {code} > org.opentest4j.AssertionFailedError: expected: but was: > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35) > at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227) > at > kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896
[ https://issues.apache.org/jira/browse/KAFKA-13800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13800. --- Resolution: Fixed > Remove force cast of TimeWindowKStreamImpl in tests of > https://github.com/apache/kafka/pull/11896 > - > > Key: KAFKA-13800 > URL: https://issues.apache.org/jira/browse/KAFKA-13800 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > We can remove the cast after `emitStrategy` is added to public api -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on PR #12135: URL: https://github.com/apache/kafka/pull/12135#issuecomment-1126616157 Merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang closed pull request #12037: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang closed pull request #12037: KAFKA-13785: [7/N][Emit final] emit final for sliding window URL: https://github.com/apache/kafka/pull/12037 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872914073 ## streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java: ## @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class SlidingWindowedKStreamIntegrationTest { +private static final int NUM_BROKERS = 1; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, +mkProperties( +mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp +) +); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + + +private StreamsBuilder builder; +private Properties streamsConfiguration; +private KafkaStreams kafkaStreams; +private String streamOneInput; +private String streamTwoInput; +private String outputTopic; + +@Rule +public TestName testName = new TestName(); + +@Parameter +public StrategyType type; + +@Parameter(1) +public boolean
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872913253 ## streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java: ## @@ -0,0 +1,1238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofHours; +import static java.time.Duration.ofMinutes; +import static java.time.Instant.ofEpochMilli; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues; +import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; +import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872912670 ## streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java: ## @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class SlidingWindowedKStreamIntegrationTest { +private static final int NUM_BROKERS = 1; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, +mkProperties( +mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp +) +); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + + +private StreamsBuilder builder; +private Properties streamsConfiguration; +private KafkaStreams kafkaStreams; +private String streamOneInput; +private String streamTwoInput; +private String outputTopic; + +@Rule +public TestName testName = new TestName(); + +@Parameter +public StrategyType type; + +@Parameter(1) +public boolean
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872912604 ## streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java: ## @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class SlidingWindowedKStreamIntegrationTest { +private static final int NUM_BROKERS = 1; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, +mkProperties( +mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp +) +); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + + +private StreamsBuilder builder; +private Properties streamsConfiguration; +private KafkaStreams kafkaStreams; +private String streamOneInput; +private String streamTwoInput; +private String outputTopic; + +@Rule +public TestName testName = new TestName(); + +@Parameter +public StrategyType type; + +@Parameter(1) +public boolean
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872912257 ## streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java: ## @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class SlidingWindowedKStreamIntegrationTest { +private static final int NUM_BROKERS = 1; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, +mkProperties( +mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp +) +); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + + +private StreamsBuilder builder; +private Properties streamsConfiguration; +private KafkaStreams kafkaStreams; +private String streamOneInput; +private String streamTwoInput; +private String outputTopic; + +@Rule +public TestName testName = new TestName(); + +@Parameter +public StrategyType type; + +@Parameter(1) +public boolean
[GitHub] [kafka] hachikuji commented on a diff in pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode
hachikuji commented on code in PR #12109: URL: https://github.com/apache/kafka/pull/12109#discussion_r872906546 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -2033,25 +2039,40 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { LogConfig.CompressionTypeProp -> "producer" ).asJava val newTopic = new NewTopic(topic, 2, brokerCount.toShort) -val e1 = assertThrows(classOf[ExecutionException], - () => client.createTopics(Collections.singletonList(newTopic.configs(invalidConfigs))).all.get()) -assertTrue(e1.getCause.isInstanceOf[InvalidRequestException], - s"Unexpected exception ${e1.getCause.getClass}") +assertFutureExceptionTypeEquals( + client.createTopics(Collections.singletonList(newTopic.configs(invalidConfigs))).all, + classOf[InvalidRequestException], + Some("Null value not supported for topic configs : retention.bytes") +) -val validConfigs = Map[String, String](LogConfig.CompressionTypeProp -> "producer").asJava - client.createTopics(Collections.singletonList(newTopic.configs(validConfigs))).all.get() +val validConfigs = Map[String, String](LogConfig.CompressionTypeProp -> "producer") + client.createTopics(Collections.singletonList(newTopic.configs(validConfigs.asJava))).all.get() waitForTopics(client, expectedPresent = Seq(topic), expectedMissing = List()) +// Only need to wait for kraft brokers +if (isKRaftTest()) { Review Comment: Could we use the new `ensureConsistentKRaftMetadata` function? ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -773,10 +773,18 @@ static void validateNewTopicNames(Map topicErrors, for (CreatableTopic topic : topics) { if (topicErrors.containsKey(topic.name())) continue; Map> topicConfigs = new HashMap<>(); +List nullConfigs = new ArrayList<>(); for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) { -topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value())); +if (config.value() == null) { +nullConfigs.add(config.name()); +} else { +topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value())); +} } -if (!topicConfigs.isEmpty()) { +if (!nullConfigs.isEmpty()) { +topicErrors.put(topic.name(), new ApiError(Errors.INVALID_REQUEST, Review Comment: On a related note, I opened https://github.com/apache/kafka/pull/12162. I wonder if we should call this INVALID_CONFIG instead? ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -773,10 +773,18 @@ static void validateNewTopicNames(Map topicErrors, for (CreatableTopic topic : topics) { if (topicErrors.containsKey(topic.name())) continue; Map> topicConfigs = new HashMap<>(); +List nullConfigs = new ArrayList<>(); for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) { -topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value())); +if (config.value() == null) { +nullConfigs.add(config.name()); +} else { +topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value())); +} } -if (!topicConfigs.isEmpty()) { +if (!nullConfigs.isEmpty()) { +topicErrors.put(topic.name(), new ApiError(Errors.INVALID_REQUEST, +"Null value not supported for topic configs : " + String.join(",", nullConfigs))); Review Comment: nit: no space before colon? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872905269 ## streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java: ## @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class SlidingWindowedKStreamIntegrationTest { +private static final int NUM_BROKERS = 1; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, +mkProperties( +mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp +) +); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + + +private StreamsBuilder builder; +private Properties streamsConfiguration; +private KafkaStreams kafkaStreams; +private String streamOneInput; +private String streamTwoInput; +private String outputTopic; + +@Rule +public TestName testName = new TestName(); + +@Parameter +public StrategyType type; Review Comment: Ack! -- This is
[GitHub] [kafka] cmccabe merged pull request #12155: MINOR: convert some tests to KRaft
cmccabe merged PR #12155: URL: https://github.com/apache/kafka/pull/12155 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #12122: Upgrade tests for KAFKA-13769
mjsax merged PR #12122: URL: https://github.com/apache/kafka/pull/12122 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872901962 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ## @@ -103,59 +89,12 @@ public void enableSendingOldValues() { sendOldValues = true; } - -private class KStreamWindowAggregateProcessor extends ContextualProcessor, Change> { -private TimestampedWindowStore windowStore; -private TimestampedTupleForwarder, VAgg> tupleForwarder; -private Sensor droppedRecordsSensor; -private Sensor emittedRecordsSensor; -private Sensor emitFinalLatencySensor; +private class KStreamWindowAggregateProcessor extends AbstractKStreamTimeWindowAggregateProcessor { private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; Review Comment: Ack! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872901603 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java: ## @@ -467,13 +459,33 @@ private boolean rightWindowIsNotEmpty(final ValueAndTimestamp rightWinAgg, return rightWinAgg != null && rightWinAgg.timestamp() > inputRecordTimestamp; } +@Override +protected long emitRangeLowerBound(final long windowCloseTime) { +return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? +0L : Math.max(0L, lastEmitWindowCloseTime - windows.timeDifferenceMs()); +} + +@Override +protected long emitRangeUpperBound(final long windowCloseTime) { +// Sliding window's start and end timestamps are inclusive, so +// we should minus 1 for the inclusive closed window-end upper bound +return windowCloseTime - windows.timeDifferenceMs() - 1; Review Comment: I can actually be negative, and we would skip the range fetching in that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
guozhangwang merged PR #12104: URL: https://github.com/apache/kafka/pull/12104 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872900761 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java: ## @@ -50,22 +46,32 @@ private final SlidingWindows windows; private final Initializer initializer; private final Aggregator aggregator; +private final EmitStrategy emitStrategy; private boolean sendOldValues = false; public KStreamSlidingWindowAggregate(final SlidingWindows windows, final String storeName, final Initializer initializer, final Aggregator aggregator) { +this(windows, storeName, EmitStrategy.onWindowUpdate(), initializer, aggregator); Review Comment: I replied in the other comment that this is now only used in `cogroup` which do not have emit-on-final yet, but I guess we can always just call it explicitly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872900375 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor droppedRecordsSensor; +protected Sensor emittedRecordsSensor; +protected Sensor emitFinalLatencySensor; +protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; +protected InternalProcessorContext, Change> internalProcessorContext; + +protected AbstractKStreamTimeWindowAggregateProcessor(final String storeName, + final EmitStrategy emitStrategy, + final boolean sendOldValues) { +this.storeName = storeName; +this.emitStrategy = emitStrategy; +this.sendOldValues = sendOldValues; +} + +@Override +public void init(final ProcessorContext, Change> context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext, Change>) context; +final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); +emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +windowStore = context.getStateStore(storeName); + +if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { +// Restore last emit close time for ON_WINDOW_CLOSE strategy +final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); +
[GitHub] [kafka] guozhangwang commented on pull request #12035: KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so
guozhangwang commented on PR #12035: URL: https://github.com/apache/kafka/pull/12035#issuecomment-1126589067 Thanks @sayantanu-dey I will review asap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13746) Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed
[ https://issues.apache.org/jira/browse/KAFKA-13746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13746. --- Resolution: Fixed > Flaky > kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed > > > Key: KAFKA-13746 > URL: https://issues.apache.org/jira/browse/KAFKA-13746 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Blocker > Fix For: 3.3.0 > > > Example: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/ > {code} > java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1 > at > kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:686) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
guozhangwang commented on code in PR #12104: URL: https://github.com/apache/kafka/pull/12104#discussion_r872898659 ## core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala: ## @@ -732,7 +733,18 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) val aliveServers = brokers.filterNot(_.config.brokerId == 0) - TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0) + + if (isKRaftTest()) { +TestUtils.ensureConsistentKRaftMetadata(aliveServers, controllerServer, "Timeout waiting for topic configs propagating to brokers") + } else { +TestUtils.waitUntilTrue( + () => aliveServers.forall( +broker => + broker.metadataCache.getPartitionInfo(underMinIsrTopic, 0).get.isr().size() < 6 && +broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() == MetadataResponse.NO_LEADER_ID), Review Comment: Thanks for the explanation @dengziming ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once
[ https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536926#comment-17536926 ] Matthias J. Sax commented on KAFKA-13817: - > as it will eventually self-correct and continue to throttle, I believe how > fast it self-correct depends on the magnitude of clock-drift and the > {color:#871094}emitIntervalMs value.{color} That's exactly the point. We want to avoid that we only throttle eventually for this case, but keep throttling right away. Thus, instead of just computing "next = next + X" we want to compute "next = now + X" to quickly fast forward in case we missed an interval. We do similar thing in the windowed aggregation: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L260-L269] > Schedule nextTimeToEmit to system time every time instead of just once > -- > > Key: KAFKA-13817 > URL: https://issues.apache.org/jira/browse/KAFKA-13817 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Lim Qing Wei >Priority: Minor > Labels: beginner, newbie > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.] > > If this is just scheduled once, this can trigger emit every time if system > time jumps a lot suddenly. > > For example, > # nextTimeToEmit set to 1 and step is 1 > # If next system time jumps to 100, we will always emit for next 100 records -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] xjin-Confluent opened a new pull request, #12164: Update upgrade.html
xjin-Confluent opened a new pull request, #12164: URL: https://github.com/apache/kafka/pull/12164 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once
[ https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536895#comment-17536895 ] Lim Qing Wei commented on KAFKA-13817: -- Hi [~lihaosky] , I am new to contributing to Kafka, I wish to understand the issue better Do you mean that we might not throttle properly when clock drift happen because it relies on wall-clock? I am not sure if I fully understand your example, are you saying that we should set `{color:#871094}sharedTimeTracker{color}.{color:#871094}nextTimeToEmit` to system time {color}on every method invocation?{color:#871094} {color} I notice we always invoke the following if we dont throttle {code:java} sharedTimeTracker.advanceNextTimeToEmit(){code} This should mitigate the clock drift issue, right? as it will eventually self-correct and continue to throttle, I believe how fast it self-correct depends on the magnitude of clock-drift and the {color:#871094}emitIntervalMs value. {color} > Schedule nextTimeToEmit to system time every time instead of just once > -- > > Key: KAFKA-13817 > URL: https://issues.apache.org/jira/browse/KAFKA-13817 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Lim Qing Wei >Priority: Minor > Labels: beginner, newbie > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.] > > If this is just scheduled once, this can trigger emit every time if system > time jumps a lot suddenly. > > For example, > # nextTimeToEmit set to 1 and step is 1 > # If next system time jumps to 100, we will always emit for next 100 records -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Reopened] (KAFKA-13502) Support configuring BROKER_LOGGER on controller-only KRaft nodes
[ https://issues.apache.org/jira/browse/KAFKA-13502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reopened KAFKA-13502: I accidentally resolved this issue. > Support configuring BROKER_LOGGER on controller-only KRaft nodes > > > Key: KAFKA-13502 > URL: https://issues.apache.org/jira/browse/KAFKA-13502 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once
[ https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lim Qing Wei reassigned KAFKA-13817: Assignee: Lim Qing Wei > Schedule nextTimeToEmit to system time every time instead of just once > -- > > Key: KAFKA-13817 > URL: https://issues.apache.org/jira/browse/KAFKA-13817 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Lim Qing Wei >Priority: Minor > Labels: beginner, newbie > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.] > > If this is just scheduled once, this can trigger emit every time if system > time jumps a lot suddenly. > > For example, > # nextTimeToEmit set to 1 and step is 1 > # If next system time jumps to 100, we will always emit for next 100 records -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13903) Add queue size metric to QuorumController
[ https://issues.apache.org/jira/browse/KAFKA-13903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-13903: - Labels: kip-500 (was: ) > Add queue size metric to QuorumController > - > > Key: KAFKA-13903 > URL: https://issues.apache.org/jira/browse/KAFKA-13903 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: David Arthur >Priority: Major > Labels: kip-500 > > In order to stay in-line with existing queue metrics in > ControllerEventManager mbean, we need to include a queue size. The current > size of the underlying queue in QuorumController needs to be exposed as: > {{kafka.controller:type=ControllerEventManager,name=EventQueueSize}} > It looks like KafkaEventQueue will need to add support for reporting its size. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13902) Support online metadata.version upgrades for KIP-704 in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-13902: - Labels: kip-500 (was: ) > Support online metadata.version upgrades for KIP-704 in KRaft > -- > > Key: KAFKA-13902 > URL: https://issues.apache.org/jira/browse/KAFKA-13902 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: David Arthur >Priority: Minor > Labels: kip-500 > > KIP-704 landed in trunk before the KIP-778 metadata.version work that is now > in progress. The KIP-704 (unclean leader recovery) code uses the usual IBP > approach for enabling the feature on startup. > Once KAFKA-13830 is merged, we will need to update the KIP-704 code to deal > with online upgrades. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13903) Add queue size metric to QuorumController
David Arthur created KAFKA-13903: Summary: Add queue size metric to QuorumController Key: KAFKA-13903 URL: https://issues.apache.org/jira/browse/KAFKA-13903 Project: Kafka Issue Type: Bug Components: controller Reporter: David Arthur In order to stay in-line with existing queue metrics in ControllerEventManager mbean, we need to include a queue size. The current size of the underlying queue in QuorumController needs to be exposed as: {{kafka.controller:type=ControllerEventManager,name=EventQueueSize}} It looks like KafkaEventQueue will need to add support for reporting its size. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13902) Support online metadata.version upgrades for KIP-704 in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-13902: - Component/s: controller > Support online metadata.version upgrades for KIP-704 in KRaft > -- > > Key: KAFKA-13902 > URL: https://issues.apache.org/jira/browse/KAFKA-13902 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: David Arthur >Priority: Minor > > KIP-704 landed in trunk before the KIP-778 metadata.version work that is now > in progress. The KIP-704 (unclean leader recovery) code uses the usual IBP > approach for enabling the feature on startup. > Once KAFKA-13830 is merged, we will need to update the KIP-704 code to deal > with online upgrades. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13902) Support online metadata.version upgrades for KIP-704 in KRaft
David Arthur created KAFKA-13902: Summary: Support online metadata.version upgrades for KIP-704 in KRaft Key: KAFKA-13902 URL: https://issues.apache.org/jira/browse/KAFKA-13902 Project: Kafka Issue Type: Sub-task Reporter: David Arthur KIP-704 landed in trunk before the KIP-778 metadata.version work that is now in progress. The KIP-704 (unclean leader recovery) code uses the usual IBP approach for enabling the feature on startup. Once KAFKA-13830 is merged, we will need to update the KIP-704 code to deal with online upgrades. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ijuma commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
ijuma commented on code in PR #12163: URL: https://github.com/apache/kafka/pull/12163#discussion_r872799071 ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -40,10 +55,32 @@ public static void update(Checksum checksum, ByteBuffer buffer, int length) { public static void update(Checksum checksum, ByteBuffer buffer, int offset, int length) { if (buffer.hasArray()) { checksum.update(buffer.array(), buffer.position() + buffer.arrayOffset() + offset, length); -} else { -int start = buffer.position() + offset; -for (int i = start; i < start + length; i++) -checksum.update(buffer.get(i)); +return; +} +if (BYTE_BUFFER_UPDATE != null) { +final int oldPosition = buffer.position(); +final int oldLimit = buffer.limit(); +try { +// save a slice to be used to save an allocation in the hot-path +final int start = buffer.position() + offset; +buffer.position(0); +buffer.limit(start + length); +buffer.position(start); +BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer); +return; +} catch (Throwable t) { +// fallback Review Comment: Shouldn't we propagate the exception instead of falling back? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
ijuma commented on code in PR #12163: URL: https://github.com/apache/kafka/pull/12163#discussion_r872795794 ## clients/src/main/java/org/apache/kafka/common/utils/Checksums.java: ## @@ -40,10 +55,32 @@ public static void update(Checksum checksum, ByteBuffer buffer, int length) { public static void update(Checksum checksum, ByteBuffer buffer, int offset, int length) { if (buffer.hasArray()) { checksum.update(buffer.array(), buffer.position() + buffer.arrayOffset() + offset, length); -} else { -int start = buffer.position() + offset; -for (int i = start; i < start + length; i++) -checksum.update(buffer.get(i)); +return; +} +if (BYTE_BUFFER_UPDATE != null) { +final int oldPosition = buffer.position(); +final int oldLimit = buffer.limit(); +try { +// save a slice to be used to save an allocation in the hot-path +final int start = buffer.position() + offset; +buffer.position(0); +buffer.limit(start + length); +buffer.position(start); +BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer); +return; +} catch (Throwable t) { +// fallback +} finally { +// reset buffer's offsets +buffer.position(0); Review Comment: Is it intentional that we are calling `position` twice in the `finally` block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r872767805 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -897,7 +897,8 @@ private void initializeAndRestorePhase() { } // we can always let changelog reader try restoring in order to initialize the changelogs; // if there's no active restoring or standby updating it would not try to fetch any data -changelogReader.restore(taskManager.tasks()); +// After KAFKA-13873, we only restore the not paused tasks. +changelogReader.restore(taskManager.notPausedTasks()); Review Comment: "Yes." The next time that a resumed task goes through the loop it will be returned in `notPausedTasks()` and `restore` will be called. (I say "Yes" because that's not a separate, direct call to `restore` when resuming.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r872767805 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -897,7 +897,8 @@ private void initializeAndRestorePhase() { } // we can always let changelog reader try restoring in order to initialize the changelogs; // if there's no active restoring or standby updating it would not try to fetch any data -changelogReader.restore(taskManager.tasks()); +// After KAFKA-13873, we only restore the not paused tasks. +changelogReader.restore(taskManager.notPausedTasks()); Review Comment: "Yes." The next time that a resumed task goes through the loop it will be returned in `notPausedTasks()` and `restore` will be called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r872766397 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -463,7 +463,10 @@ public void restore(final Map tasks) { final TaskId taskId = changelogs.get(partition).stateManager.taskId(); try { if (restoreChangelog(changelogs.get(partition))) { -tasks.get(taskId).clearTaskTimeout(); +final Task task = tasks.get(taskId); +if (task != null) { Review Comment: The taskId is coming from the changeLogs partition info a few lines above. The tasks map returned is now smaller and does not contain all the keys anymore! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
wcarlson5 commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r872731431 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -463,7 +463,10 @@ public void restore(final Map tasks) { final TaskId taskId = changelogs.get(partition).stateManager.taskId(); try { if (restoreChangelog(changelogs.get(partition))) { -tasks.get(taskId).clearTaskTimeout(); +final Task task = tasks.get(taskId); +if (task != null) { Review Comment: Why are we getting null here when we were not before? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -273,6 +273,12 @@ Collection allTasks() { return readOnlyTasks; } +Collection notPausedTasks() { +return Collections.unmodifiableCollection(readOnlyActiveTasks.stream().filter(t -> Review Comment: nit: Can we align each call on its own line for readability? ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -0,0 +1,897 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.KeyValue.pair; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForStandbyCompletion; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.LagInfo; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import
[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
junrao commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r872705006 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: Yes, then we just need to pass in a single endpoint to AbstractFetcherThread. This seems simpler, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12041: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed
guozhangwang commented on PR #12041: URL: https://github.com/apache/kafka/pull/12041#issuecomment-1126350779 AH yes, that's clear. My concern was that it's assuming the defined properties should be all retrieved in the constructor (since the `logUnused` is called at the end of it). I think it's true for most clients --- at least I think in producer and consumer, but it may not be the case for streams. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] franz1981 commented on pull request #12163: KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods
franz1981 commented on PR #12163: URL: https://github.com/apache/kafka/pull/12163#issuecomment-1126318782 I've put it in draft mode in order to better format it according to the community's rule, but feel free to review it: content-wise should be done. Some numbers, for JDK 11: ``` Benchmark (bytes) (direct) (readonly) (seed) Mode Cnt Score Error Units Crc32CBenchamrk.checksum 128 false false 42 thrpt 20 26.922 ± 0.065 ops/us Crc32CBenchamrk.checksum 128 falsetrue 42 thrpt 20 9.321 ± 0.031 ops/us Crc32CBenchamrk.checksum 128 true false 42 thrpt 20 24.656 ± 0.620 ops/us Crc32CBenchamrk.checksum 128 truetrue 42 thrpt 20 24.429 ± 0.668 ops/us Crc32CBenchamrk.checksum 1024 false false 42 thrpt 20 6.548 ± 0.025 ops/us Crc32CBenchamrk.checksum 1024 falsetrue 42 thrpt 20 3.603 ± 0.087 ops/us Crc32CBenchamrk.checksum 1024 true false 42 thrpt 20 6.432 ± 0.136 ops/us Crc32CBenchamrk.checksum 1024 truetrue 42 thrpt 20 6.499 ± 0.042 ops/us Crc32CBenchamrk.checksum 4096 false false 42 thrpt 20 4.031 ± 0.022 ops/us Crc32CBenchamrk.checksum 4096 falsetrue 42 thrpt 20 1.511 ± 0.009 ops/us Crc32CBenchamrk.checksum 4096 true false 42 thrpt 20 4.004 ± 0.016 ops/us Crc32CBenchamrk.checksum 4096 truetrue 42 thrpt 20 4.002 ± 0.020 ops/us ``` The purpose of the PR is to makes heap and direct ByteBuffer able to perform the same (especially not read-only). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13901) Exactly once producer cannot start due to TimeoutException: Timeout expired after 300000ms awaiting InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-13901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Augarten updated KAFKA-13901: - Description: I'm currently running into this problem running org.apache.kafka:kafka and org.apache.kafka:kafka-client version 2.5.1. The symptoms of this problem are very similar to https://issues.apache.org/jira/browse/KAFKA-8803 I'm currently running a test that runs one embedded Kafka broker and an embedded Flink cluster (version 1.13). The Flink application uses an exactly once Kafka producer. When initializing, the FlinkKafkaProducer calls [initTransactions|https://github.com/apache/flink/blob/be969dd73b533b03acaba1d81d03b29fccc54bfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1282-L1285] for each of ~50 underlying KafkaProducers. On some executions of the test (but not all of them), one of these calls to InitProducerId times out. Based on my reading of the logs and some debugging sessions, it seems that the failing producer continually tries to issue FindCoordinatorRequests, but [Sender.awaitNodeReady|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L511-L526] returns null. It seems like every time [NetworkClient.leastLoadedNode|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L662] returns null and prints that the one broker "is neither ready for sending or connecting"([source|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L692]). I set the producer max.block.ms to be 5mins and you can see in the logs that this goes on and on for the full 5 minutes. During this time, the broker appears healthy (and even serves other requests). It seems that the client is not even attempting to reconnect to the broker during this 5 minutes, though I truthfully don't understand what could be going wrong looking through the code. Do you have any ideas? Any more information I could provide? was: I'm currently running into this problem running org.apache.kafka:kafka and org.apache.kafka:kafka-client version 2.5.1. The symptoms of this problem are very similar to https://issues.apache.org/jira/browse/KAFKA-8803 I'm currently running a test that runs one embedded Kafka broker and an embedded Flink cluster (version 1.13). The Flink application uses an exactly once Kafka producer. When initializing, the FlinkKafkaProducer calls [initTransactions|https://github.com/apache/flink/blob/be969dd73b533b03acaba1d81d03b29fccc54bfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1282-L1285] for each of ~50 underlying KafkaProducers. On some executions of the test (but not all of them), one of these calls to InitProducerId times out. Based on my reading of the logs and some debugging sessions, it seems that the failing producer continually tries to issue FindCoordinatorRequests, but [Sender.awaitNodeReady|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L511-L526] returns null. It seems like every time [NetworkClient.leastLoadedNode|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L662] returns null prints that the one broker "is neither ready for sending or connecting"([source|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L692]). I set the producer max.block.ms to be 5mins and you can see in the logs that this goes on and on for the full 5 minutes. During this time, the broker appears healthy (and even serves other requests). It seems that the client is not even attempting to reconnect to the broker during this 5 minutes, though I truthfully don't understand what could be going wrong looking through the code. Do you have any ideas? Any more information I could provide? > Exactly once producer cannot start due to TimeoutException: Timeout expired > after 30ms awaiting InitProducerId > -- > > Key: KAFKA-13901 > URL: https://issues.apache.org/jira/browse/KAFKA-13901 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.1 >Reporter: Ben Augarten >Priority: Major > Attachments: broker-logs-renamed-topics, client-logs-truncated > > > I'm currently running into this problem running org.apache.kafka:kafka and > org.apache.kafka:kafka-client version 2.5.1. > > The symptoms of this problem are very similar to >
[jira] [Updated] (KAFKA-13900) Support Java 9 direct ByteBuffer Checksum methods
[ https://issues.apache.org/jira/browse/KAFKA-13900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Francesco Nigro updated KAFKA-13900: Description: Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) (see [Java 9's Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]): Kafka already provides specific support for Java 9's Cr32C, hence it makes sense it's going to use the most optimized version of it for direct ByteBuffers as well (read-only or not), instead of performing a byte-per-byte computation. I'm aware that currently the client's Buffer pools aren't using direct ByteBuffer, but having full support for it can open the door to future interesting optimizations on it. was: Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) (see [Java 9's Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):] kafka already provides specific support for Java 9's Cr32C, hence it makes sense it's going to use the most optimized version of it, in case fed ByteBuffer is direct (read-only or not), instead of performing a computation byte-per-byte. I'm aware that currently the client's Buffer pools aren't using direct ByteBuffer, but having full support for it can open the door to future interesting optimizations on it. > Support Java 9 direct ByteBuffer Checksum methods > - > > Key: KAFKA-13900 > URL: https://issues.apache.org/jira/browse/KAFKA-13900 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 3.1.1 >Reporter: Francesco Nigro >Priority: Minor > Labels: performance, performance-benchmark > > Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) > (see [Java 9's > Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]): > Kafka already provides specific support for Java 9's Cr32C, hence it makes > sense it's going to use the most optimized version of it for direct > ByteBuffers as well (read-only or not), instead of performing a byte-per-byte > computation. > > I'm aware that currently the client's Buffer pools aren't using direct > ByteBuffer, but having full support for it can open the door to future > interesting optimizations on it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13901) Exactly once producer cannot start due to TimeoutException: Timeout expired after 300000ms awaiting InitProducerId
Ben Augarten created KAFKA-13901: Summary: Exactly once producer cannot start due to TimeoutException: Timeout expired after 30ms awaiting InitProducerId Key: KAFKA-13901 URL: https://issues.apache.org/jira/browse/KAFKA-13901 Project: Kafka Issue Type: Bug Affects Versions: 2.5.1 Reporter: Ben Augarten Attachments: broker-logs-renamed-topics, client-logs-truncated I'm currently running into this problem running org.apache.kafka:kafka and org.apache.kafka:kafka-client version 2.5.1. The symptoms of this problem are very similar to https://issues.apache.org/jira/browse/KAFKA-8803 I'm currently running a test that runs one embedded Kafka broker and an embedded Flink cluster (version 1.13). The Flink application uses an exactly once Kafka producer. When initializing, the FlinkKafkaProducer calls [initTransactions|https://github.com/apache/flink/blob/be969dd73b533b03acaba1d81d03b29fccc54bfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1282-L1285] for each of ~50 underlying KafkaProducers. On some executions of the test (but not all of them), one of these calls to InitProducerId times out. Based on my reading of the logs and some debugging sessions, it seems that the failing producer continually tries to issue FindCoordinatorRequests, but [Sender.awaitNodeReady|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L511-L526] returns null. It seems like every time [NetworkClient.leastLoadedNode|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L662] returns null prints that the one broker "is neither ready for sending or connecting"([source|https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L692]). I set the producer max.block.ms to be 5mins and you can see in the logs that this goes on and on for the full 5 minutes. During this time, the broker appears healthy (and even serves other requests). It seems that the client is not even attempting to reconnect to the broker during this 5 minutes, though I truthfully don't understand what could be going wrong looking through the code. Do you have any ideas? Any more information I could provide? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13900) Support Java 9 direct ByteBuffer Checksum methods
[ https://issues.apache.org/jira/browse/KAFKA-13900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Francesco Nigro updated KAFKA-13900: Description: Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) (see [[Java 9's Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-):] kafka already provides specific support for Java 9's Cr32C, hence it makes sense it's going to use the most optimized version of it, in case fed ByteBuffer is direct (read-only or not), instead of performing a computation byte-per-byte. I'm aware that currently the client's Buffer pools aren't using direct ByteBuffer, but having full support for it can open the door to future interesting optimizations on it. was: Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) (see https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-): kafka already provides specific support for Java 9's Cr32C, hence it makes sense it's going to use the most optimized version of it, in case fed ByteBuffer is direct (read-only or not), instead of performing a computation byte-per-byte. I'm aware that currently the client's Buffer pools aren't using direct ByteBuffer, but having full support for it can open the door to future interesting optimizations on it. > Support Java 9 direct ByteBuffer Checksum methods > - > > Key: KAFKA-13900 > URL: https://issues.apache.org/jira/browse/KAFKA-13900 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 3.1.1 >Reporter: Francesco Nigro >Priority: Minor > Labels: performance, performance-benchmark > > Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) > (see [[Java 9's > Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-):] > kafka already provides specific support for Java 9's Cr32C, hence it makes > sense it's going to use the most optimized version of it, in case fed > ByteBuffer is direct (read-only or not), instead of performing a computation > byte-per-byte. > > I'm aware that currently the client's Buffer pools aren't using direct > ByteBuffer, but having full support for it can open the door to future > interesting optimizations on it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13900) Support Java 9 direct ByteBuffer Checksum methods
[ https://issues.apache.org/jira/browse/KAFKA-13900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Francesco Nigro updated KAFKA-13900: Description: Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) (see [Java 9's Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):] kafka already provides specific support for Java 9's Cr32C, hence it makes sense it's going to use the most optimized version of it, in case fed ByteBuffer is direct (read-only or not), instead of performing a computation byte-per-byte. I'm aware that currently the client's Buffer pools aren't using direct ByteBuffer, but having full support for it can open the door to future interesting optimizations on it. was: Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) (see [[Java 9's Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-):] kafka already provides specific support for Java 9's Cr32C, hence it makes sense it's going to use the most optimized version of it, in case fed ByteBuffer is direct (read-only or not), instead of performing a computation byte-per-byte. I'm aware that currently the client's Buffer pools aren't using direct ByteBuffer, but having full support for it can open the door to future interesting optimizations on it. > Support Java 9 direct ByteBuffer Checksum methods > - > > Key: KAFKA-13900 > URL: https://issues.apache.org/jira/browse/KAFKA-13900 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 3.1.1 >Reporter: Francesco Nigro >Priority: Minor > Labels: performance, performance-benchmark > > Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) > (see [Java 9's > Checksum::update|https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-]):] > kafka already provides specific support for Java 9's Cr32C, hence it makes > sense it's going to use the most optimized version of it, in case fed > ByteBuffer is direct (read-only or not), instead of performing a computation > byte-per-byte. > > I'm aware that currently the client's Buffer pools aren't using direct > ByteBuffer, but having full support for it can open the door to future > interesting optimizations on it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13900) Support Java 9 direct ByteBuffer Checksum methods
Francesco Nigro created KAFKA-13900: --- Summary: Support Java 9 direct ByteBuffer Checksum methods Key: KAFKA-13900 URL: https://issues.apache.org/jira/browse/KAFKA-13900 Project: Kafka Issue Type: Improvement Components: clients, core Affects Versions: 3.1.1 Reporter: Francesco Nigro Java 9 has added a new Checksum method that can makes uses of ByteBuffer(s) (see https://docs.oracle.com/javase/9/docs/api/java/util/zip/Checksum.html#update-java.nio.ByteBuffer-): kafka already provides specific support for Java 9's Cr32C, hence it makes sense it's going to use the most optimized version of it, in case fed ByteBuffer is direct (read-only or not), instead of performing a computation byte-per-byte. I'm aware that currently the client's Buffer pools aren't using direct ByteBuffer, but having full support for it can open the door to future interesting optimizations on it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872646217 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ## @@ -248,80 +182,43 @@ public void process(final Record record) { } } -tryEmitFinalResult(record, windowCloseTime); +maybeMeasureEmitFinalLatency(record, windowCloseTime); } -private void tryEmitFinalResult(final Record record, final long windowCloseTime) { -if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { -return; -} - -final long now = internalProcessorContext.currentSystemTimeMs(); -// Throttle emit frequency as an optimization, the tradeoff is that we need to remember the -// window close time when we emitted last time so that we can restart from there in the next emit -if (now < timeTracker.nextTimeToEmit) { -return; -} - -// Schedule next emit time based on now to avoid the case that if system time jumps a lot, -// this can be triggered every time -timeTracker.nextTimeToEmit = now; -timeTracker.advanceNextTimeToEmit(); - -// Window close time has not progressed, there will be no windows to close hence no records to emit -if (lastEmitWindowCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitWindowCloseTime >= windowCloseTime) { +@Override +protected void maybeForwardFinalResult(final Record record, final long windowCloseTime) { +if (!shouldEmitFinal(windowCloseTime)) { return; } final long emitRangeUpperBoundInclusive = windowCloseTime - windows.size(); -// No window has ever closed and hence no need to emit any records if (emitRangeUpperBoundInclusive < 0) { +// If emitRangeUpperBoundInclusive is 0, it means first window closes since windowEndTime +// is exclusive return; } - -// Set emitRangeLowerBoundInclusive to -1L if lastEmitWindowCloseTime was not set so that -// we would fetch from 0L for the first time; otherwise set it to lastEmitWindowCloseTime - windows.size(). -// -// Note if we get here, it means emitRangeUpperBoundInclusive > 0, which means windowCloseTime > windows.size(), -// Because we always set lastEmitWindowCloseTime to windowCloseTime before, it means -// lastEmitWindowCloseTime - windows.size() should always > 0 -// As a result, emitRangeLowerBoundInclusive is always >= 0 +// Because we only get here when emitRangeUpperBoundInclusive > 0 which means closeTime > windows.size() +// Since we set lastEmitCloseTime to closeTime before storing to processor metadata +// lastEmitCloseTime - windows.size() is always > 0 +// Set emitRangeLowerBoundInclusive to -1L if not set so that when we fetchAll, we fetch from 0L final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? -1L : lastEmitWindowCloseTime - windows.size(); Review Comment: I've made some refactoring, LMK what do you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #12162: KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs
hachikuji opened a new pull request, #12162: URL: https://github.com/apache/kafka/pull/12162 In the AlterConfigs/IncrementalAlterConfigs zk handler, we return `INVALID_REQUEST` and `INVALID_CONFIG` inconsistently. The problem is in `LogConfig.validate`. We may either return `ConfigException` or `InvalidConfigException`. When the first of these is thrown, we catch it and convert to INVALID_REQUEST. If the latter is thrown, then we return `INVALID_CONFIG`. It seems more appropriate to return INVALID_CONFIG consistently. Note that the KRaft implementation already does this. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872636671 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java: ## @@ -467,13 +459,35 @@ private boolean rightWindowIsNotEmpty(final ValueAndTimestamp rightWinAgg, return rightWinAgg != null && rightWinAgg.timestamp() > inputRecordTimestamp; } +@Override +protected void maybeForwardFinalResult(final Record record, final long windowCloseTime) { +if (!shouldEmitFinal(windowCloseTime)) { +return; +} + +final long emitRangeUpperBoundExclusive = windowCloseTime - windows.timeDifferenceMs(); + +if (emitRangeUpperBoundExclusive <= 0) { +// Sliding window's start and end timestamps are inclusive, so +// the window is not closed if emitRangeUpperBoundExclusive is 0, +// and we shouldn't emit +return; +} + +final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? +0L : lastEmitWindowCloseTime - windows.timeDifferenceMs(); Review Comment: So far it seems `lastEmitWindowCloseTime` should always be no smaller than window size in either sliding or time windows, but when there's bugs it's possible that the read value from the processor metadata is small. I will update it accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y opened a new pull request, #12161: DRAFT: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y opened a new pull request, #12161: URL: https://github.com/apache/kafka/pull/12161 This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies. Added an integration test to show how pausing and resuming works. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andymg3 opened a new pull request, #12160: KAFKA-13889: Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL
andymg3 opened a new pull request, #12160: URL: https://github.com/apache/kafka/pull/12160 ### JIRA https://issues.apache.org/jira/browse/KAFKA-13889 ### Description - Fixes `AclsDelta` to handle `ACCESS_CONTROL_ENTRY_RECORD` quickly followed by `REMOVE_ACCESS_CONTROL_ENTRY_RECORD` for same ACL - As explained in the JIRA, in https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java#L64 we store the pending deletion in the `changes` Map. This could override a creation that might have just happened. This is an issue because in `BrokerMetadataPublisher` this results in us making a `removeAcl` call which finally results in https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L203 being executed and this code throws an exception if the ACL isnt in the Map yet. If the `ACCESS_CONTROL_ENTRY_RECORD` event never got processed by `BrokerMetadataPublisher` then the ACL wont be in the Map yet. - So the fix here is to remove the entry from the `changes` Map if the ACL doesnt exist in the image yet. ### Testing - Added unit tests for new behavior ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-13871) The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the RaftConfig class is incorrect
[ https://issues.apache.org/jira/browse/KAFKA-13871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yingquan he closed KAFKA-13871. --- > The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the > RaftConfig class is incorrect > -- > > Key: KAFKA-13871 > URL: https://issues.apache.org/jira/browse/KAFKA-13871 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: yingquan he >Assignee: lqjacklee >Priority: Minor > > The syntax of the field QUORUM_FETCH_TIMEOUT_MS_DOC is incorrect. `a > election` should be changed to `an election`. > > {code:java} > public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time > without a successful fetch from " + > "the current leader before becoming a candidate and triggering a election > for voters; Maximum time without " + > "receiving fetch from a majority of the quorum before asking around to > see if there's a new epoch for leader"; {code} > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13889) Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL
[ https://issues.apache.org/jira/browse/KAFKA-13889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-13889: - Summary: Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL (was: Update AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL) > Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by > REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL > --- > > Key: KAFKA-13889 > URL: https://issues.apache.org/jira/browse/KAFKA-13889 > Project: Kafka > Issue Type: Bug >Reporter: Andrew Grant >Priority: Major > > In > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java#L64] > we store the pending deletion in the changes map. This could override a > creation that might have just happened. This is an issue because in > BrokerMetadataPublisher this results in us making a removeAcl call which > finally results in > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L203] > being executed and this code throws an exception if the ACL isnt in the Map > yet. If the ACCESS_CONTROL_ENTRY_RECORD event never got processed by > BrokerMetadataPublisher then the ACL wont be in the Map yet. > My feeling is we might want to make removeAcl idempotent in that it returns > success if the ACL doesn't exist: no matter how many times removeAcl is > called it returns success if the ACL is deleted. Maybe we’d just log a > warning or something? > Note, I dont think the AclControlManager has this issue because it doesn't > batch the events like AclsDelta does. However, we still do throw a > RuntimeException here > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L197] > - maybe we should still follow the same logic (if we make the fix suggested > above) and just log a warning if the ACL doesnt exist in the Map? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872607460 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor droppedRecordsSensor; +protected Sensor emittedRecordsSensor; +protected Sensor emitFinalLatencySensor; +protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; +protected InternalProcessorContext, Change> internalProcessorContext; + +protected AbstractKStreamTimeWindowAggregateProcessor(final String storeName, + final EmitStrategy emitStrategy, + final boolean sendOldValues) { +this.storeName = storeName; +this.emitStrategy = emitStrategy; +this.sendOldValues = sendOldValues; +} + +@Override +public void init(final ProcessorContext, Change> context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext, Change>) context; +final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); +emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +windowStore = context.getStateStore(storeName); + +if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { +// Restore last emit close time for ON_WINDOW_CLOSE strategy +final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); +
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872604769 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor droppedRecordsSensor; +protected Sensor emittedRecordsSensor; +protected Sensor emitFinalLatencySensor; +protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; +protected InternalProcessorContext, Change> internalProcessorContext; + +protected AbstractKStreamTimeWindowAggregateProcessor(final String storeName, + final EmitStrategy emitStrategy, + final boolean sendOldValues) { +this.storeName = storeName; +this.emitStrategy = emitStrategy; +this.sendOldValues = sendOldValues; +} + +@Override +public void init(final ProcessorContext, Change> context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext, Change>) context; +final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); +emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +windowStore = context.getStateStore(storeName); + +if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { +// Restore last emit close time for ON_WINDOW_CLOSE strategy +final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); +
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872603611 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor droppedRecordsSensor; +protected Sensor emittedRecordsSensor; +protected Sensor emitFinalLatencySensor; +protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; +protected InternalProcessorContext, Change> internalProcessorContext; + +protected AbstractKStreamTimeWindowAggregateProcessor(final String storeName, + final EmitStrategy emitStrategy, + final boolean sendOldValues) { +this.storeName = storeName; +this.emitStrategy = emitStrategy; +this.sendOldValues = sendOldValues; +} + +@Override +public void init(final ProcessorContext, Change> context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext, Change>) context; +final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); +emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +windowStore = context.getStateStore(storeName); + +if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { +// Restore last emit close time for ON_WINDOW_CLOSE strategy +final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); +
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872602138 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor droppedRecordsSensor; +protected Sensor emittedRecordsSensor; +protected Sensor emitFinalLatencySensor; +protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; +protected InternalProcessorContext, Change> internalProcessorContext; + +protected AbstractKStreamTimeWindowAggregateProcessor(final String storeName, + final EmitStrategy emitStrategy, + final boolean sendOldValues) { +this.storeName = storeName; +this.emitStrategy = emitStrategy; +this.sendOldValues = sendOldValues; +} + +@Override +public void init(final ProcessorContext, Change> context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext, Change>) context; +final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); +emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +windowStore = context.getStateStore(storeName); + +if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { +// Restore last emit close time for ON_WINDOW_CLOSE strategy +final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); +
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872601055 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor droppedRecordsSensor; +protected Sensor emittedRecordsSensor; +protected Sensor emitFinalLatencySensor; +protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; +protected InternalProcessorContext, Change> internalProcessorContext; + +protected AbstractKStreamTimeWindowAggregateProcessor(final String storeName, + final EmitStrategy emitStrategy, + final boolean sendOldValues) { +this.storeName = storeName; +this.emitStrategy = emitStrategy; +this.sendOldValues = sendOldValues; +} + +@Override +public void init(final ProcessorContext, Change> context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext, Change>) context; +final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); +emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +windowStore = context.getStateStore(storeName); + +if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { +// Restore last emit close time for ON_WINDOW_CLOSE strategy +final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); +
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872600795 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor droppedRecordsSensor; +protected Sensor emittedRecordsSensor; +protected Sensor emitFinalLatencySensor; +protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; +protected InternalProcessorContext, Change> internalProcessorContext; + +protected AbstractKStreamTimeWindowAggregateProcessor(final String storeName, + final EmitStrategy emitStrategy, + final boolean sendOldValues) { +this.storeName = storeName; +this.emitStrategy = emitStrategy; +this.sendOldValues = sendOldValues; +} + +@Override +public void init(final ProcessorContext, Change> context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext, Change>) context; +final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); +emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +windowStore = context.getStateStore(storeName); + +if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { +// Restore last emit close time for ON_WINDOW_CLOSE strategy +final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); +
[jira] [Updated] (KAFKA-13889) Update AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL
[ https://issues.apache.org/jira/browse/KAFKA-13889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-13889: - Summary: Update AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL (was: Broker can't handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL) > Update AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by > REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL > -- > > Key: KAFKA-13889 > URL: https://issues.apache.org/jira/browse/KAFKA-13889 > Project: Kafka > Issue Type: Bug >Reporter: Andrew Grant >Priority: Major > > In > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java#L64] > we store the pending deletion in the changes map. This could override a > creation that might have just happened. This is an issue because in > BrokerMetadataPublisher this results in us making a removeAcl call which > finally results in > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L203] > being executed and this code throws an exception if the ACL isnt in the Map > yet. If the ACCESS_CONTROL_ENTRY_RECORD event never got processed by > BrokerMetadataPublisher then the ACL wont be in the Map yet. > My feeling is we might want to make removeAcl idempotent in that it returns > success if the ACL doesn't exist: no matter how many times removeAcl is > called it returns success if the ACL is deleted. Maybe we’d just log a > warning or something? > Note, I dont think the AclControlManager has this issue because it doesn't > batch the events like AclsDelta does. However, we still do throw a > RuntimeException here > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L197] > - maybe we should still follow the same logic (if we make the fix suggested > above) and just log a warning if the ACL doesnt exist in the Map? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872596894 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> { + +private final Time time = Time.SYSTEM; +private final String storeName; +private final EmitStrategy emitStrategy; +private final boolean sendOldValues; +protected final TimeTracker timeTracker = new TimeTracker(); + +private TimestampedTupleForwarder, VAgg> tupleForwarder; +protected TimestampedWindowStore windowStore; +protected Sensor droppedRecordsSensor; +protected Sensor emittedRecordsSensor; +protected Sensor emitFinalLatencySensor; +protected long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; +protected InternalProcessorContext, Change> internalProcessorContext; + +protected AbstractKStreamTimeWindowAggregateProcessor(final String storeName, + final EmitStrategy emitStrategy, + final boolean sendOldValues) { +this.storeName = storeName; +this.emitStrategy = emitStrategy; +this.sendOldValues = sendOldValues; +} + +@Override +public void init(final ProcessorContext, Change> context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext, Change>) context; +final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); +emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), +internalProcessorContext.currentNode().name(), metrics); +windowStore = context.getStateStore(storeName); + +if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { +// Restore last emit close time for ON_WINDOW_CLOSE strategy +final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); +
[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r872596541 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -53,7 +55,7 @@ public class ClusterConfig { ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart, SecurityProtocol securityProtocol, String listenerName, File trustStoreFile, - String ibp) { + String ibp, MetadataVersion metadataVersion) { Review Comment: Hm, actually I think I'll remove the ibp part from the annotation and just use MetadataVersion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872593623 ## streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java: ## @@ -19,15 +19,52 @@ import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy; import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; /** * This interface controls the strategy that can be used to control how we emit results in a processor. */ public interface EmitStrategy { +Logger log = LoggerFactory.getLogger(EmitStrategy.class); + enum StrategyType { -ON_WINDOW_CLOSE, -ON_WINDOW_UPDATE +ON_WINDOW_UPDATE(0, new WindowUpdateStrategy()), +ON_WINDOW_CLOSE(1, new WindowCloseStrategy()); + +private final short code; +private final EmitStrategy strategy; + +private short code() { +return this.code; +} + +private EmitStrategy strategy() { +return this.strategy; +} + +StrategyType(final int code, final EmitStrategy strategy) { +this.code = (short) code; +this.strategy = strategy; +} + +private final static Map TYPE_TO_STRATEGY = new HashMap<>(); + +static { +for (final StrategyType type : StrategyType.values()) { +if (TYPE_TO_STRATEGY.put(type.code(), type.strategy()) != null) +throw new IllegalStateException("Code " + type.code() + " for type " + Review Comment: I was following the enum `Errors` as well to add this guard. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on code in PR #12135: URL: https://github.com/apache/kafka/pull/12135#discussion_r872593270 ## streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java: ## @@ -19,15 +19,52 @@ import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy; import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; /** * This interface controls the strategy that can be used to control how we emit results in a processor. */ public interface EmitStrategy { +Logger log = LoggerFactory.getLogger(EmitStrategy.class); + enum StrategyType { -ON_WINDOW_CLOSE, -ON_WINDOW_UPDATE +ON_WINDOW_UPDATE(0, new WindowUpdateStrategy()), +ON_WINDOW_CLOSE(1, new WindowCloseStrategy()); + +private final short code; +private final EmitStrategy strategy; + +private short code() { +return this.code; +} + +private EmitStrategy strategy() { +return this.strategy; +} + +StrategyType(final int code, final EmitStrategy strategy) { Review Comment: This is because Java's default type is int -- i.e. line 36/37 above would take the value 0/1 as int. So we basically need to either do the conversion in each line of 36/37 above, or just do the conversion once here. I followed our other enums like `Errors` to do the conversion here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13899) Inconsistent error codes returned from AlterConfig APIs
Jason Gustafson created KAFKA-13899: --- Summary: Inconsistent error codes returned from AlterConfig APIs Key: KAFKA-13899 URL: https://issues.apache.org/jira/browse/KAFKA-13899 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson In the AlterConfigs/IncrementalAlterConfigs zk handler, we return INVALID_REQUEST and INVALID_CONFIG inconsistently. The problem is in `LogConfig.validate`. We may either return `ConfigException` or `InvalidConfigException`. When the first of these is thrown, we catch it and convert to INVALID_REQUEST. It seems more consistent to convert to INVALID_CONFIG. Note that the kraft implementation returns INVALID_CONFIG consistently. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536747#comment-17536747 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872567579 ## .htaccess: ## @@ -9,3 +9,5 @@ RewriteRule ^/?(\d+)/javadoc - [S=2] RewriteRule ^/?(\d+)/images/ - [S=1] RewriteCond $2 !=protocol RewriteRule ^/?(\d+)/([a-z]+)(\.html)? /$1/documentation#$2 [R=302,L,NE] +RewriteCond %{REQUEST_FILENAME}.html -f +RewriteRule ^(.*)$ %{REQUEST_FILENAME}.html Review Comment: Add a rewrite rule, it will serve `.html` if the HTML file exists. I do have a question, is this file being used in Production? If its not, then this change should be fine, if it is, then maybe we need someone that is familiar with it to review, I never operate httpd so I might be missing something > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536746#comment-17536746 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872567579 ## .htaccess: ## @@ -9,3 +9,5 @@ RewriteRule ^/?(\d+)/javadoc - [S=2] RewriteRule ^/?(\d+)/images/ - [S=1] RewriteCond $2 !=protocol RewriteRule ^/?(\d+)/([a-z]+)(\.html)? /$1/documentation#$2 [R=302,L,NE] +RewriteCond %{REQUEST_FILENAME}.html -f +RewriteRule ^(.*)$ %{REQUEST_FILENAME}.html Review Comment: Add a rewrite rule, it will serve `.html` if the HTML file exists. I do have a question, is this file being used in Production? > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536742#comment-17536742 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on PR #410: URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1126221013 Hi @mimaison sorry for the back and forth, after some fiddling, I realize the problem is that when requesting `localhost:8080/path`, httpd tries to find a file named`path`, which does not exists, and what we really want is to resolve `path` to `path.html` in project root. I ended solving this by adding a rewrite rule in `.htaccess` file, but I don't know how is it solved in the real website deployment, is `.htaccess` actually used in production? > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-5074) Transition to OnlinePartition without preferred leader in ISR fails
[ https://issues.apache.org/jira/browse/KAFKA-5074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536721#comment-17536721 ] Paul Dubuc commented on KAFKA-5074: --- I see that this ticket is still open and unresolved. Is there a particular version of Kafka where this problem has been fixed? Or is there a workaround method for taking a node down that avoids this problem? Thanks. > Transition to OnlinePartition without preferred leader in ISR fails > --- > > Key: KAFKA-5074 > URL: https://issues.apache.org/jira/browse/KAFKA-5074 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0 >Reporter: Dustin Cote >Priority: Major > > Running 0.9.0.0, the controller can get into a state where it no longer is > able to elect a leader for an Offline partition. It's unclear how this state > is first achieved but in the steady state, this happens: > -There are partitions with a leader of -1 > -The Controller repeatedly attempts a preferred leader election for these > partitions > -The preferred leader election fails because the only replica in the ISR is > not the preferred leader > The log cycle looks like this: > {code} > [2017-04-12 18:00:18,891] INFO [Controller 8]: Starting preferred replica > leader election for partitions topic,1 > [2017-04-12 18:00:18,891] INFO [Partition state machine on Controller 8]: > Invoking state change to OnlinePartition for partitions topic,1 > [2017-04-12 18:00:18,892] INFO [PreferredReplicaPartitionLeaderSelector]: > Current leader -1 for partition [topic,1] is not the preferred replica. > Trigerring preferred replica leader election > (kafka.controller.PreferredReplicaPartitionLeaderSelector) > [2017-04-12 18:00:18,893] WARN [Controller 8]: Partition [topic,1] failed to > complete preferred replica leader election. Leader is -1 > (kafka.controller.KafkaController) > {code} > It's not clear if this would happen on versions later that 0.9.0.0. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536718#comment-17536718 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on PR #410: URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1126166013 Woops, might be a regression, sorry, let me check, I will move this into draft in the mean time I agree, it is better to make it work for the whole site > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] fvaleri opened a new pull request, #12159: Fix stuck SSL tests in case of authentication failure
fvaleri opened a new pull request, #12159: URL: https://github.com/apache/kafka/pull/12159 When there is an authentication error after the initial TCP connection, the selector never becomes READY, and these tests wait forever waiting for this state. This is actually what happened to me while using an OpenJDK build that does not support the required cipher suites. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13898) metrics.recording.level is underdocumented
Tom Bentley created KAFKA-13898: --- Summary: metrics.recording.level is underdocumented Key: KAFKA-13898 URL: https://issues.apache.org/jira/browse/KAFKA-13898 Project: Kafka Issue Type: Improvement Components: docs Reporter: Tom Bentley metrics.recording.level is only briefly described in the documentation. In particular the recording level associated with each metric is not documented, which makes it difficult to know the effect of changing the level. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r872493845 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -927,6 +984,29 @@ private void appendRaftEvent(String name, Runnable runnable) { } } +/** + * A callback for changes to feature levels including metadata.version. This is called synchronously from + * {@link FeatureControlManager#replay(FeatureLevelRecord)} which is part of a ControllerWriteEvent. It is safe + * to modify controller state here. By the time this listener is called, a FeatureLevelRecord has been committed and + * the in-memory state of FeatureControlManager has been updated. + */ +class QuorumFeatureListener implements FeatureLevelListener { +@Override +public void handle(String featureName, short finalizedVersion) { Review Comment: If we need to react to a feature level change in one of the control managers, it will probably be easier to do it via QuorumController rather than adding dependencies to FeatureControlManager. But, we don't have any use cases yet so I guess we don't really need the listener thing yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Gerrrr commented on pull request #12122: WIP: Upgrade tests for KAFKA-13769
Ge commented on PR #12122: URL: https://github.com/apache/kafka/pull/12122#issuecomment-1126143043 Green CI run with system tests - https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4897/. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536685#comment-17536685 ] ASF GitHub Bot commented on KAFKA-13882: mimaison commented on PR #410: URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1126118797 When I run this locally, I can only access pages under `Docs`, for anything else I get 404. Is it expected? I would be nice to preview the complete website. > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] joel-hamill commented on a diff in pull request #12153: MINOR: Clarify impact of num.replica.fetchers
joel-hamill commented on code in PR #12153: URL: https://github.com/apache/kafka/pull/12153#discussion_r872430394 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -894,7 +894,7 @@ object KafkaConfig { "record batch size accepted by the broker is defined via message.max.bytes (broker config) or " + "max.message.bytes (topic config)." val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from a source broker. " + Review Comment: i have updated the PR https://github.com/apache/kafka/pull/12153/commits/0957f0881c7638c533ff2f0ffbc083f58448bc79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joel-hamill commented on a diff in pull request #12153: MINOR: Clarify impact of num.replica.fetchers
joel-hamill commented on code in PR #12153: URL: https://github.com/apache/kafka/pull/12153#discussion_r872429806 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -894,7 +894,7 @@ object KafkaConfig { "record batch size accepted by the broker is defined via message.max.bytes (broker config) or " + "max.message.bytes (topic config)." val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from a source broker. " + - "Increasing this value can increase the degree of I/O parallelism in the follower broker." + "Increasing this value can increase the degree of I/O parallelism in the follower and leader broker at the cost of higher CPU utilization." Review Comment: okay i have updated based on the feedback below from @hachikuji -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r872423670 ## metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java: ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.controller.util.SnapshotFileReader; +import org.apache.kafka.controller.util.SnapshotFileWriter; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.RaftClient; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.snapshot.SnapshotReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + + +/** + * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the + * format is the same as a KRaft snapshot. + */ +public class BootstrapMetadata { +private static final Logger log = LoggerFactory.getLogger(BootstrapMetadata.class); + +public static final String BOOTSTRAP_FILE = "bootstrap.snapshot"; + +private final MetadataVersion metadataVersion; + +private final List records; + +BootstrapMetadata(MetadataVersion metadataVersion, List records) { +this.metadataVersion = metadataVersion; +this.records = Collections.unmodifiableList(records); +} + +public MetadataVersion metadataVersion() { +return this.metadataVersion; +} + +public List records() { +return records; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; +BootstrapMetadata metadata = (BootstrapMetadata) o; +return metadataVersion == metadata.metadataVersion; +} + +@Override +public int hashCode() { +return Objects.hash(metadataVersion); +} + +@Override +public String toString() { +return "BootstrapMetadata{" + +"metadataVersion=" + metadataVersion + +'}'; +} + +/** + * A raft client listener that simply collects all of the commits and snapshots into a mapping of + * metadata record type to list of records. + */ +private static class BootstrapListener implements RaftClient.Listener { +private final List records = new ArrayList<>(); + +@Override +public void handleCommit(BatchReader reader) { +try { +while (reader.hasNext()) { +Batch batch = reader.next(); +records.addAll(batch.records()); +} +} finally { +reader.close(); +} +} + +@Override +public void handleSnapshot(SnapshotReader reader) { +try { +while (reader.hasNext()) { +Batch batch = reader.next(); +for (ApiMessageAndVersion messageAndVersion : batch) { +records.add(messageAndVersion); +} +} +} finally { +reader.close(); +} +} +} + +public static BootstrapMetadata create(MetadataVersion metadataVersion) { +return create(metadataVersion, new ArrayList<>()); +} + +public static BootstrapMetadata create(MetadataVersion metadataVersion, List records) { +if (!metadataVersion.isKRaftSupported()) { +throw new IllegalArgumentException("Cannot create BootstrapMetadata with a non-KRaft metadata version."); +} +records.add(new ApiMessageAndVersion( +new FeatureLevelRecord() +
[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r872417278 ## metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java: ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.controller.util.SnapshotFileReader; +import org.apache.kafka.controller.util.SnapshotFileWriter; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.RaftClient; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.snapshot.SnapshotReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + + +/** + * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the + * format is the same as a KRaft snapshot. + */ +public class BootstrapMetadata { +private static final Logger log = LoggerFactory.getLogger(BootstrapMetadata.class); + +public static final String BOOTSTRAP_FILE = "bootstrap.snapshot"; + +private final MetadataVersion metadataVersion; + +private final List records; + +BootstrapMetadata(MetadataVersion metadataVersion, List records) { +this.metadataVersion = metadataVersion; +this.records = Collections.unmodifiableList(records); +} + +public MetadataVersion metadataVersion() { +return this.metadataVersion; +} + +public List records() { +return records; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; +BootstrapMetadata metadata = (BootstrapMetadata) o; +return metadataVersion == metadata.metadataVersion; +} + +@Override +public int hashCode() { +return Objects.hash(metadataVersion); +} + +@Override +public String toString() { +return "BootstrapMetadata{" + +"metadataVersion=" + metadataVersion + +'}'; +} + +/** + * A raft client listener that simply collects all of the commits and snapshots into a mapping of + * metadata record type to list of records. + */ +private static class BootstrapListener implements RaftClient.Listener { +private final List records = new ArrayList<>(); + +@Override +public void handleCommit(BatchReader reader) { +try { +while (reader.hasNext()) { +Batch batch = reader.next(); +records.addAll(batch.records()); +} +} finally { +reader.close(); +} +} + +@Override +public void handleSnapshot(SnapshotReader reader) { +try { +while (reader.hasNext()) { +Batch batch = reader.next(); +for (ApiMessageAndVersion messageAndVersion : batch) { +records.add(messageAndVersion); +} +} +} finally { +reader.close(); +} +} +} + +public static BootstrapMetadata create(MetadataVersion metadataVersion) { +return create(metadataVersion, new ArrayList<>()); +} + +public static BootstrapMetadata create(MetadataVersion metadataVersion, List records) { +if (!metadataVersion.isKRaftSupported()) { +throw new IllegalArgumentException("Cannot create BootstrapMetadata with a non-KRaft metadata version."); +} +records.add(new ApiMessageAndVersion( +new FeatureLevelRecord() +
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536616#comment-17536616 ] ASF GitHub Bot commented on KAFKA-13882: mimaison commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872351993 ## start-preview.sh: ## @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -euxo pipefail + +docker build -t kafka-site-preview . + +docker run -dit --rm --name mypreview -p 8080:80 -v "$PWD":/usr/local/apache2/htdocs/ kafka-site-preview + +echo "You can stop the preview server by running `docker stop mypreview`" Review Comment: If you know a better way to achieve the same, yes do it > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536583#comment-17536583 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872289325 ## start-preview.sh: ## @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -euxo pipefail + +docker build -t kafka-site-preview . + +docker run -dit --rm --name mypreview -p 8080:80 -v "$PWD":/usr/local/apache2/htdocs/ kafka-site-preview + +echo "You can stop the preview server by running `docker stop mypreview`" Review Comment: ~Also, can you explain what `read -r -d '' _ Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536584#comment-17536584 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872289325 ## start-preview.sh: ## @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -euxo pipefail + +docker build -t kafka-site-preview . + +docker run -dit --rm --name mypreview -p 8080:80 -v "$PWD":/usr/local/apache2/htdocs/ kafka-site-preview + +echo "You can stop the preview server by running `docker stop mypreview`" Review Comment: ~Also, can you explain what `read -r -d '' _ Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536574#comment-17536574 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872289325 ## start-preview.sh: ## @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -euxo pipefail + +docker build -t kafka-site-preview . + +docker run -dit --rm --name mypreview -p 8080:80 -v "$PWD":/usr/local/apache2/htdocs/ kafka-site-preview + +echo "You can stop the preview server by running `docker stop mypreview`" Review Comment: Also, can you explain what `read -r -d '' _ Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536573#comment-17536573 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872286879 ## start-preview.sh: ## @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -euxo pipefail + +docker build -t kafka-site-preview . + +docker run -dit --rm --name mypreview -p 8080:80 -v "$PWD":/usr/local/apache2/htdocs/ kafka-site-preview + +echo "You can stop the preview server by running `docker stop mypreview`" Review Comment: Hi, thanks for the suggestion, I am not a shell expert so I might be missing something here, I think we achieve similar effect by not running the container in detached mode. Then ctrl + c will work, do you think that's acceptable? > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13897) Add 3.1.1 to system tests and streams upgrade tests
Tom Bentley created KAFKA-13897: --- Summary: Add 3.1.1 to system tests and streams upgrade tests Key: KAFKA-13897 URL: https://issues.apache.org/jira/browse/KAFKA-13897 Project: Kafka Issue Type: Task Components: streams, system tests Reporter: Tom Bentley Fix For: 3.3.0, 3.1.2, 3.2.1 Per the penultimate bullet on the [release checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses], Kafka v3.1.1 is released. We should add this version to the system tests. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536544#comment-17536544 ] ASF GitHub Bot commented on KAFKA-13882: mimaison commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872225024 ## start-preview.sh: ## @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -euxo pipefail + +docker build -t kafka-site-preview . + +docker run -dit --rm --name mypreview -p 8080:80 -v "$PWD":/usr/local/apache2/htdocs/ kafka-site-preview + +echo "You can stop the preview server by running `docker stop mypreview`" Review Comment: What about doing something like this to make it even easier to use? ```suggestion #!/usr/bin/env bash set -euo pipefail clean() { docker stop site-preview exit 0 } trap clean SIGINT docker build -t kafka-site-preview . docker run -dit --rm --name site-preview -p 8080:80 -v "$PWD":/usr/local/apache2/htdocs/ kafka-site-preview echo "The preview is available on http://localhost:8080.; echo "Press CTRL+C to stop it." read -r -d '' _ Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] bozhao12 closed pull request #12022: MINOR:Remove repeated variable reset in KafkaController.scala
bozhao12 closed pull request #12022: MINOR:Remove repeated variable reset in KafkaController.scala URL: https://github.com/apache/kafka/pull/12022 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bozhao12 opened a new pull request, #12158: MINOR:A few code cleanUps in KafkaController
bozhao12 opened a new pull request, #12158: URL: https://github.com/apache/kafka/pull/12158 Following variables in kafkaController are used for metric statistics: ``` offlinePartitionCount preferredReplicaImbalanceCount globalTopicCount globalPartitionCount topicsToDeleteCount replicasToDeleteCount ineligibleTopicsToDeleteCount ineligibleReplicasToDeleteCount ``` When Controller goes from active to non-active, these variables will be reset to 0. Currently, we will perform reset operations in `KafkaController.onControllerResignation() `and `KafkaController.updateMetrics()` . in fact, whether it is an active controller or a non-active controller, as long as it receives events related to controller change, The method` KafkaController.updateMetrics()` will be executed, and decide whether to reset the above variables. So the reset operations in `KafkaController.onControllerResignation() ` can actually be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12549) Allow state stores to opt-in transactional support
[ https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Sorokoumov reassigned KAFKA-12549: --- Assignee: Alex Sorokoumov > Allow state stores to opt-in transactional support > -- > > Key: KAFKA-12549 > URL: https://issues.apache.org/jira/browse/KAFKA-12549 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Assignee: Alex Sorokoumov >Priority: Major > > Right now Kafka Stream's EOS implementation does not make any assumptions > about the state store's transactional support. Allowing the state stores to > optionally provide transactional support can have multiple benefits. E.g., if > we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, > {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are > supported via an additional {{boolean transactional()}} API, and if yes the > these APIs can be used under both ALOS and EOS like the following (otherwise > then just fallback to the normal processing logic): > Within thread processing loops: > 1. store.beginTxn > 2. store.put // during processing > 3. streams commit // either through eos protocol or not > 4. store.commitTxn > 5. start the next txn by store.beginTxn > If the state stores allow Streams to do something like above, we can have the > following benefits: > * Reduce the duplicated records upon crashes for ALOS (note this is not EOS > still, but some middle-ground where uncommitted data within a state store > would not be retained if store.commitTxn failed). > * No need to wipe the state store and re-bootstrap from scratch upon crashes > for EOS. E.g., if a crash-failure happened between streams commit completes > and store.commitTxn. We can instead just roll-forward the transaction by > replaying the changelog from the second recent streams committed offset > towards the most recent committed offset. > * Remote stores that support txn then do not need to support wiping > (https://issues.apache.org/jira/browse/KAFKA-12475). > * We can fix the known issues of emit-on-change > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams). > * We can support "query committed data only" for interactive queries (see > below for reasons). > As for the implementation of these APIs, there are several options: > * The state store itself have natural transaction features (e.g. RocksDB). > * Use an in-memory buffer for all puts within a transaction, and upon > `commitTxn` write the whole buffer as a batch to the underlying state store, > or just drop the whole buffer upon aborting. Then for interactive queries, > one can optionally only query the underlying store for committed data only. > * Use a separate store as the transient persistent buffer. Upon `beginTxn` > create a new empty transient store, and upon `commitTxn` merge the store into > the underlying store. Same applies for interactive querying committed-only > data. This has a benefit compared with the one above that there's no memory > pressure even with long transactions, but incurs more complexity / > performance overhead with the separate persistent store. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-9837) New RPC for notifying controller of failed replica
[ https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536522#comment-17536522 ] dengziming commented on KAFKA-9837: --- [~soarez] Thank you for your proposals, I think this is what we are thinking about, let the controller recording which directory hosts each replica, we can avoid sending an O(n) request. But I think the name ASSIGN_REPLICAS_TO_FAIL_GROUP and FAIL_GROUP is not clear, how about ASSIGN_REPLICAS_TO_DIRECTORY, FAIL_DIRECTORY. btw, we have decided to mark KRaft as production soon in KIP-833, so this KIP will be useful and important. > New RPC for notifying controller of failed replica > -- > > Key: KAFKA-9837 > URL: https://issues.apache.org/jira/browse/KAFKA-9837 > Project: Kafka > Issue Type: New Feature > Components: controller, core >Reporter: David Arthur >Assignee: dengziming >Priority: Major > Labels: kip-500 > > This is the tracking ticket for > [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller]. > For the bridge release, brokers should no longer use ZooKeeper to notify the > controller that a log dir has failed. It should instead use an RPC mechanism. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12153: MINOR: Clarify impact of num.replica.fetchers
divijvaidya commented on code in PR #12153: URL: https://github.com/apache/kafka/pull/12153#discussion_r872165577 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -894,7 +894,7 @@ object KafkaConfig { "record batch size accepted by the broker is defined via message.max.bytes (broker config) or " + "max.message.bytes (topic config)." val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from a source broker. " + - "Increasing this value can increase the degree of I/O parallelism in the follower broker." + "Increasing this value can increase the degree of I/O parallelism in the follower and leader broker at the cost of higher CPU utilization." Review Comment: Sure. Can't argue against actual user anecdotes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12153: MINOR: Clarify impact of num.replica.fetchers
divijvaidya commented on PR #12153: URL: https://github.com/apache/kafka/pull/12153#issuecomment-1125823580 @hachikuji one of the flaky failing tests in this PR is `Build / JDK 11 and Scala 2.13 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest ` I have fixed that test at https://github.com/apache/kafka/pull/12045 Please take a look when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536512#comment-17536512 ] ASF GitHub Bot commented on KAFKA-13882: qingwei91 commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r872153458 ## README.md: ## @@ -0,0 +1,11 @@ +# How to preview the documentation changes locally? + +The documentation can hosted on a local webserver via httpd. + +You can run it with the following command, note that it requires docker: + +```shell +sh start-preview.sh Review Comment: Got it, I've updated the file permission and the readme > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] tombentley merged pull request #12156: MINOR: Update release versions for upgrade tests with 3.1.1 release
tombentley merged PR #12156: URL: https://github.com/apache/kafka/pull/12156 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request, #12157: MINOR: Support co-resident mode in KRaft TestKit
dengziming opened a new pull request, #12157: URL: https://github.com/apache/kafka/pull/12157 *More detailed description of your change* The behavior between co-resident mode and discrete mode is different, so we should support co-resident mode too. I also find a bug in co-resident, see KAFKA-13228. *Summary of testing strategy (including rationale)* Changed existing clusterTest to support co-resident mode except for ApiVersionsRequestTest, which is related to a bug. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org