[GitHub] [kafka] Fleshgrinder edited a comment on pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials
Fleshgrinder edited a comment on pull request #9374: URL: https://github.com/apache/kafka/pull/9374#issuecomment-704057098 @chia7712 the server gets null right now, and the code you propose is more or less what I had originally. However, at least @rondagostino was directly confused by the ternary and precedence that's why we rewrote it to be easier to understand and easier to debug (now it's possible to set individual breakpoints). Modern languages like Kotlin and Rust have no ternary for a good reason. đ Imho it's better as is, despite being longer, especially because length doesn't translate directly to complex. We'd also still be collecting `null` usernames. The field is nullable in the `UserName` class but collecting them is still a bad idea. I need to check the server side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Fleshgrinder commented on pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials
Fleshgrinder commented on pull request #9374: URL: https://github.com/apache/kafka/pull/9374#issuecomment-704057098 @chia7712 the server gets null right now, and the code you propose is more or less what I had originally. However, at least @rondagostino was directly confused by the ternary and precedence that's why we rewrote it to be easier to understand and easier to debug (now it's possible to set individual breakpoints). Modern languages like Kotlin and Rust have no ternary for a good reason. đ Imho it's better as is, despite being longer, especially because length doesn't translate directly to complex. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
lkokhreidze commented on a change in pull request #9237: URL: https://github.com/apache/kafka/pull/9237#discussion_r500023844 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(value = Parameterized.class) +@Category({IntegrationTest.class}) +public class StreamTableJoinTopologyOptimizationIntegrationTest { +private static final int NUM_BROKERS = 1; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + +private String tableTopic; +private String inputTopic; +private String outputTopic; +private String applicationId; + +private Properties streamsConfiguration; + +@Rule +public TestName testName = new TestName(); + +@Parameterized.Parameter +public String topologyOptimization; + +@Parameterized.Parameters(name = "Optimization = {0}") +public static Collection topologyOptimization() { +return Arrays.asList(new String[][]{ +{StreamsConfig.OPTIMIZE}, +{StreamsConfig.NO_OPTIMIZATION} +}); +} + +@Before +public void before() throws InterruptedException { +streamsConfiguration = new Properties(); + +final String safeTestName = safeUniqueTestName(getClass(), testName); + +tableTopic = "table-topic" + safeTestName; +inputTopic = "stream-topic-" + safeTestName; +outputTopic = "output-topic-" + safeTestName; +applicationId = "app-" + safeTestName; + +CLUSTER.createTopic(inputTopic, 4, 1); +CLUSTER.createTopic(tableTopic, 2, 1); +CLUSTER.createTopic(outputTopic, 4, 1); + +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); +streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); +streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BY
[GitHub] [kafka] chia7712 commented on pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials
chia7712 commented on pull request #9374: URL: https://github.com/apache/kafka/pull/9374#issuecomment-704046977 Nice finding! Is there also a potential NPE in server-side? According to the protocol schema (https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json#L23), the field ```users``` is nullable. Does server have to handle null as well?(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3068). For another, if server is able to handle null, it should be fine to set null to ```users```. Hence, we can simplify client code as following style. ```java @Override public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { return new DescribeUserScramCredentialsRequest.Builder( new DescribeUserScramCredentialsRequestData().setUsers(users == null ? null : users.stream().map(user -> new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; } ``` please let me know what you think :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8318) Session Window Aggregations generate an extra tombstone
[ https://issues.apache.org/jira/browse/KAFKA-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208497#comment-17208497 ] Ilia Pasynkov commented on KAFKA-8318: -- Hello, [~vvcephei]. Right now I'm looking at SessionWindowedKStreamImpl and SuppressScenarioTest classes. Could you please help me to figure out, what "tombstone" is exactly meant by this issue? > Session Window Aggregations generate an extra tombstone > --- > > Key: KAFKA-8318 > URL: https://issues.apache.org/jira/browse/KAFKA-8318 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Ilia Pasynkov >Priority: Minor > Labels: newbie++ > > See the discussion > https://github.com/apache/kafka/pull/6654#discussion_r280231439 > The session merging logic generates a tombstone in addition to an update when > the session window already exists. It's not a correctness issue, just a small > performance hit, because that tombstone is immediately invalidated by the > update. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #8988: KAFKA-10199: Separate restore threads
guozhangwang commented on a change in pull request #8988: URL: https://github.com/apache/kafka/pull/8988#discussion_r51027 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -659,13 +665,12 @@ void runOnce() { } } -// we can always let changelog reader try restoring in order to initialize the changelogs; -// if there's no active restoring or standby updating it would not try to fetch any data -changelogReader.restore(); - -// TODO: we should record the restore latency and its relative time spent ratio after -// we figure out how to move this method out of the stream thread -advanceNowAndComputeLatency(); +// check if restore thread has encountered TaskCorrupted exception; if yes +// rethrow it to trigger the handling logic +final TaskCorruptedException e = restoreThread.nextCorruptedException(); Review comment: Updating the fields of TaskCorruptedException could be risky since it can be read by the other main thread concurrently. I think a better way would be still keeping its field as immutable, but drain all the exceptions (which is thread-safe) and then create a new one aggregating its tasks. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -115,13 +114,16 @@ public boolean isClosed() { @Override public void revive() { if (state == CLOSED) { +// clear all the stores since they should be re-registered Review comment: `ProcessorStateManager#changelogPartitions` relies on its `changelogOffsets` which relies on the `stores` map. If `stores` map gets cleared, then `changelogPartitions` would return nothing. So in order to get its changelog partitions to send to the restore thread, we need to get them first before clearing the `stores` map, i.e. we need to "materialize" that `changelogPartitions` map first --- maybe we did not use the right term here, sorry. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -623,16 +613,46 @@ void runOnce() { return; } -initializeAndRestorePhase(); +// we need to first add closed tasks and then created tasks to work with those revived / recycled tasks +restoreThread.addClosedTasks(taskManager.drainRemovedTasks()); + +// try to initialize created tasks that are either newly assigned or re-created from corrupted tasks +final List initializedTasks; +if (!(initializedTasks = taskManager.tryInitializeNewTasks()).isEmpty()) { +if (log.isDebugEnabled()) { +log.debug("Initializing newly created tasks {} under state {}", + initializedTasks.stream().map(AbstractTask::id).collect(Collectors.toList()), state); +} + Review comment: Ack. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; +import org.apache.kafka.streams.processor.StateRestoreListener; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atom
[jira] [Updated] (KAFKA-10577) StreamThread should be able to process any processible tasks regardless of its state
[ https://issues.apache.org/jira/browse/KAFKA-10577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10577: Component/s: streams > StreamThread should be able to process any processible tasks regardless of > its state > > > Key: KAFKA-10577 > URL: https://issues.apache.org/jira/browse/KAFKA-10577 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > After KAFKA-10199 is done, we should allow active tasks processing even if we > are not yet in RUNNING. More generally speaking, we would no longer rely on > the thread's RUNNING state to start processing any tasks, but would just > always process any processible tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10555) Improve client state machine
[ https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208492#comment-17208492 ] Matthias J. Sax commented on KAFKA-10555: - {quote}I thought we were only considering to transit to ERROR if the last thread died, but to transit to NOT_RUNNING if the last thread was removed by the user. This seems consistent with the current behavior and maintains the same semantic meaning of the ERROR state, imo. {quote} This would be the state after the KIP (without addressing this ticket). Transiting to NOT_RUNNING might be an option, but it would also be a change to the state machine, as currently, NOT_RUNNING is a terminal state after the client was closed. This, it won't be possible to add new thread when in NOT_RUNNING state following the current proposal of the KIP. However, I don't agree that it make sense to go ERROR state when the last thread dies _and_ to disallow adding new thread when in ERROR state. IMHO, there are two options: # go to ERROR state when any thread dies and disallow to add/remove threads for this case (as if a thread dies, something went wrong and we want to "lock" the client). # go to ERROR state only when the last thread dies, but allow to add new threads and thus allow to transit from ERROR back to RUNNING (via REBALANCING of course); for this case, ERROR means that we stopped processing due to an error; for this semantic interpretation of ERROR state, there is no reason to not allow adding new threads IMHO (in contrast to (1) for which we say, something bad happens we want to lock the client as we think it's unsafe to add/remove threads any longer). I personally prefer (2) over (1), as I don't think that there is a good reason to lock down the client after a thread dies (also not, after the last thread died). Also note, even if we stay in RUNNING state with zero threads, it might be ok, as users can consult `localThreadMetadata` and/or the `num-thread-alive` metric to inspect if there are any running thread. Ie, stopping the last running thread via `removeThread()` could be the same as if the last thread just died. > Improve client state machine > > > Key: KAFKA-10555 > URL: https://issues.apache.org/jira/browse/KAFKA-10555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > The KafkaStreams client exposes its state to the user for monitoring purpose > (ie, RUNNING, REBALANCING etc). The state of the client depends on the > state(s) of the internal StreamThreads that have their own states. > Furthermore, the client state has impact on what the user can do with the > client. For example, active task can only be queried in RUNNING state and > similar. > With KIP-671 and KIP-663 we improved error handling capabilities and allow to > add/remove stream thread dynamically. We allow adding/removing threads only > in RUNNING and REBALANCING state. This puts us in a "weird" position, because > if we enter ERROR state (ie, if the last thread dies), we cannot add new > threads and longer. However, if we have multiple threads and one dies, we > don't enter ERROR state and do allow to recover the thread. > Before the KIPs the definition of ERROR state was clear, however, with both > KIPs it seem that we should revisit its semantics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10577) StreamThread should be able to process any processible tasks regardless of its state
Guozhang Wang created KAFKA-10577: - Summary: StreamThread should be able to process any processible tasks regardless of its state Key: KAFKA-10577 URL: https://issues.apache.org/jira/browse/KAFKA-10577 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang After KAFKA-10199 is done, we should allow active tasks processing even if we are not yet in RUNNING. More generally speaking, we would no longer rely on the thread's RUNNING state to start processing any tasks, but would just always process any processible tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mikebin commented on pull request #9373: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores
mikebin commented on pull request #9373: URL: https://github.com/apache/kafka/pull/9373#issuecomment-704032434 Thanks for the review @ableegoldman! Added a unit test. And thanks @vvcephei for making it easier to cherry pick this fix back to 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10576) Different behavior of commitSync and commitAsync
[ https://issues.apache.org/jira/browse/KAFKA-10576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuriy Badalyantc updated KAFKA-10576: - Description: It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic. {code:java} public class TestKafka { public static void main(String[]args) { String id = "dev_test"; Map settings = new HashMap<>(); settings.put("bootstrap.servers", "localhost:9094"); settings.put("key.deserializer", StringDeserializer.class); settings.put("value.deserializer", StringDeserializer.class); settings.put("client.id", id); settings.put("group.id", id); String topic = "test"; Map offsets = new HashMap<>(); offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1)); try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) { consumer.commitSync(offsets); } } } {code} In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected â all offsets are committed to kafka. But in the case of {{commitAsync}} it will not work: {code:java} try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) { CompletableFuture result = new CompletableFuture<>(); consumer.commitAsync(offsets, new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (exception != null) { result.completeExceptionally(exception); } else { result.complete(true); } } }); result.get(15L, TimeUnit.SECONDS); } {code} The {{result}} future failed with a timeout. This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences. I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page. So, I believe that there are the next options: # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics. # It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented. was: It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic. {code:java} public class TestKafka { public static void main(String[]args) { String id = "dev_test"; Map settings = new HashMap<>(); settings.put("bootstrap.servers", "localhost:9094"); settings.put("key.deserializer", StringDeserializer.class); settings.put("value.deserializer", StringDeserializer.class); settings.put("client.id", id); settings.put("group.id", id); String topic = "test"; Map offsets = new HashMap<>(); offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1)); try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) { consumer.commitSync(offsets); } } } {code} In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected â all offsets are committed to kafka. But in the case of {{commitAsync}} it will not work: {code:java} try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) { CompletableFuture result = new CompletableFuture<>(); consumer.commitAsync(offsets, new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (exception != null) { result.completeExceptionally(exception); } else { result.complete(true); } } }); result.get(15L, TimeUnit.SECONDS); } {code} The {{result}} future failed with a timeout. This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences. I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page. So, I believe that there are the next options: # It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics. # It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented. > Different behavior of commitSync and commitAsync > > > Key: KAFKA-10576 > URL: h
[jira] [Created] (KAFKA-10576) Different behavior of commitSync and commitAsync
Yuriy Badalyantc created KAFKA-10576: Summary: Different behavior of commitSync and commitAsync Key: KAFKA-10576 URL: https://issues.apache.org/jira/browse/KAFKA-10576 Project: Kafka Issue Type: Bug Components: consumer Reporter: Yuriy Badalyantc It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic. {code:java} public class TestKafka { public static void main(String[]args) { String id = "dev_test"; Map settings = new HashMap<>(); settings.put("bootstrap.servers", "localhost:9094"); settings.put("key.deserializer", StringDeserializer.class); settings.put("value.deserializer", StringDeserializer.class); settings.put("client.id", id); settings.put("group.id", id); String topic = "test"; Map offsets = new HashMap<>(); offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1)); try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) { consumer.commitSync(offsets); } } } {code} In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected â all offsets are committed to kafka. But in the case of {{commitAsync}} it will not work: {code:java} try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) { CompletableFuture result = new CompletableFuture<>(); consumer.commitAsync(offsets, new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (exception != null) { result.completeExceptionally(exception); } else { result.complete(true); } } }); result.get(15L, TimeUnit.SECONDS); } {code} The {{result}} future failed with a timeout. This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences. I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page. So, I believe that there are the next options: # It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics. # It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore
ableegoldman commented on a change in pull request #9139: URL: https://github.com/apache/kafka/pull/9139#discussion_r46879 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ## @@ -359,7 +431,11 @@ private void getNextSegmentIterator() { setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); current.close(); -current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); +if (forward) { +current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); +} else { +current = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); +} } private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) { Review comment: Can you fix the `keyFrom == keyTo` to use `.equals` on the side (down on line 370) ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ## @@ -359,7 +431,11 @@ private void getNextSegmentIterator() { setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); current.close(); -current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); +if (forward) { Review comment: I think we're going to need some additional changes in this class similar to what we had in CachingWindowStore. Definitely at least in `getNextSegmentIterator()`. Let's make sure to have some cross-segment test coverage here as well, especially because the iteration logic of session store range queries is the hardest to wrap your head around out of all the stores (at least, it is for me) ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java ## @@ -201,7 +247,26 @@ public void remove(final Windowed sessionKey) { removeExpiredSegments(); -return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); +return registerNewIterator( +key, +key, +Long.MAX_VALUE, endTimeMap.entrySet().iterator(), Review comment: missing newline ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java ## @@ -382,9 +478,20 @@ private boolean setInnerIterators() { currentKey = nextKeyEntry.getKey(); if (latestSessionStartTime == Long.MAX_VALUE) { -recordIterator = nextKeyEntry.getValue().entrySet().iterator(); +final Set> entries; +if (forward) entries = nextKeyEntry.getValue().descendingMap().entrySet(); +else entries = nextKeyEntry.getValue().entrySet(); +recordIterator = entries.iterator(); } else { -recordIterator = nextKeyEntry.getValue().headMap(latestSessionStartTime, true).entrySet().iterator(); +final Set> entries; +if (forward) entries = nextKeyEntry.getValue() Review comment: If/else needs brackets This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
Guozhang Wang created KAFKA-10575: - Summary: StateRestoreListener#onRestoreEnd should always be triggered Key: KAFKA-10575 URL: https://issues.apache.org/jira/browse/KAFKA-10575 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete the restoration of an active task and transit it to the running state. However the restoration can also be stopped when the restoring task gets closed (because it gets migrated to another client, for example). We should also trigger the callback indicating its progress when the restoration stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #9373: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores
ableegoldman commented on pull request #9373: URL: https://github.com/apache/kafka/pull/9373#issuecomment-704010744 By the way, John was so kind as to cherrypick the trunk PR that rearranged things in this method back to the 2.6 branch -- so we should be able to cherrypick this PR smoothly, no need for a separate one for 2.6 Looks like just a handful of unrelated flaky test failures in the builds, hopefully we'll have better luck on the next run `` Build / JDK 11 / kafka.api.MetricsTest.testMetrics Build / JDK 8 / kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault Build / JDK 15 / kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup Build / JDK 15 / kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete Build / JDK 15 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable `` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #9373: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores
ableegoldman edited a comment on pull request #9373: URL: https://github.com/apache/kafka/pull/9373#issuecomment-704010744 By the way, John was so kind as to cherrypick the trunk PR that rearranged things in this method back to the 2.6 branch -- so we should be able to cherrypick this PR smoothly, no need for a separate one for 2.6 Looks like just a handful of unrelated flaky test failures in the builds, hopefully we'll have better luck on the next run ``` Build / JDK 11 / kafka.api.MetricsTest.testMetrics Build / JDK 8 / kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault Build / JDK 15 / kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup Build / JDK 15 / kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete Build / JDK 15 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8988: KAFKA-10199: Separate restore threads
guozhangwang commented on a change in pull request #8988: URL: https://github.com/apache/kafka/pull/8988#discussion_r499988582 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; +import org.apache.kafka.streams.processor.StateRestoreListener; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + + +/** + * This is the thread responsible for restoring state stores for both active and standby tasks + */ +public class StateRestoreThread extends Thread { + +private final Time time; +private final Logger log; +private final ChangelogReader changelogReader; +private final AtomicBoolean isRunning = new AtomicBoolean(true); +private final CountDownLatch shutdownLatch = new CountDownLatch(1); +private final LinkedBlockingDeque taskItemQueue; +private final AtomicReference> completedChangelogs; +private final LinkedBlockingDeque corruptedExceptions; + +public boolean isRunning() { +return isRunning.get(); +} + +public StateRestoreThread(final Time time, + final StreamsConfig config, + final String threadClientId, + final Admin adminClient, + final String groupId, + final Consumer restoreConsumer, + final StateRestoreListener userStateRestoreListener) { +this(time, threadClientId, new StoreChangelogReader(time, config, threadClientId, +adminClient, groupId, restoreConsumer, userStateRestoreListener)); +} + +// for testing only +public StateRestoreThread(final Time time, + final String threadClientId, + final ChangelogReader changelogReader) { +super(threadClientId); + +final String logPrefix = String.format("state-restore-thread [%s] ", threadClientId); +final LogContext logContext = new LogContext(logPrefix); + +this.time = time; +this.log = logContext.logger(getClass()); +this.taskItemQueue = new LinkedBlockingDeque<>(); +this.corruptedExceptions = new LinkedBlockingDeque<>(); +this.completedChangelogs = new AtomicReference<>(Collections.emptySet()); + +this.changelogReader = changelogReader; +} + +private synchronized void waitIfAllChangelogsCompleted() { +final Set allChangelogs = changelogReader.allChangelogs(); +if (allChangelogs.equals(changelogReader.completedChangelogs())) { +log.debug("All changelogs {} have completed restoration so far, will wait " + +"until new changelogs are registered", allChangelogs); + +while (isRunning.get() && taskItemQueue.isEmpty()) { +try { +wait(); +} catch (final InterruptedException e) { +// do nothing +} +} +} +} + +public synchronized void addInitializedTasks(final List tasks) { +if (!tasks.isEmpty()) { +for (fi
[GitHub] [kafka] vvcephei commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore
vvcephei commented on a change in pull request #9368: URL: https://github.com/apache/kafka/pull/9368#discussion_r499812565 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -205,4 +207,9 @@ default boolean maybePunctuateSystemTime() { return false; } +void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, + final TimeoutException timeoutException, + final Logger log) throws StreamsException; Review comment: StreamsException is unchecked, right? It's better to document unchecked exceptions in the `@throws` javadoc tag. The `throws` keyword is for telling the compiler that you want callers instead of yourself to handle a _checked_ exception. I honestly have no idea why the java team chose to say "this is poor style" instead of just making it a compiler error, but that's the rationale. https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html#throwstag ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -431,6 +432,9 @@ public void restore() { // in order to make sure we call the main consumer#poll in time. // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); + +// TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ? +// TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ? Review comment: Thanks. I agree we can afford to leave this for future improvements. It does seem like we should have some kind of improvement in the future, though. Having a restore or standby-update fail indefinitely would be just as damaging to an application's robustness as having the main consumer fail indefinitely. Perhaps we can make some improvements to the Consumer first, though, so that we don't have to do so much guesswork to distinguish between "no records" and "no fetch". ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -137,4 +146,48 @@ public void update(final Set topicPartitions, final Map deadlineMs) { +final String errorMessage = String.format( +"Task %s did not make progress within %d ms. Adjust `%s` if needed.", +id, +currentWallClockMs - deadlineMs + taskTimeoutMs, +StreamsConfig.TASK_TIMEOUT_MS_CONFIG +); + +if (timeoutException != null) { +throw new TimeoutException(errorMessage, timeoutException); +} else { +throw new TimeoutException(errorMessage); +} +} + +if (timeoutException != null) { +log.debug( +"Timeout exception. Remaining time to deadline {}; retrying.", +deadlineMs - currentWallClockMs, +timeoutException +); Review comment: This is the wrong format for this log message. The exception won't be logged. You have to format the string first: ```suggestion log.debug( String.format("Timeout exception. Remaining time to deadline %d; retrying.", deadlineMs - currentWallClockMs), timeoutException ); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -137,4 +146,48 @@ public void update(final Set topicPartitions, final Map
[jira] [Resolved] (KAFKA-10439) Connect's Values class loses precision for integers, larger than 64 bits
[ https://issues.apache.org/jira/browse/KAFKA-10439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-10439. Resolution: Fixed Merged and back ported to the branches listed under "Fix version" > Connect's Values class loses precision for integers, larger than 64 bits > > > Key: KAFKA-10439 > URL: https://issues.apache.org/jira/browse/KAFKA-10439 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Major > Fix For: 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.2, 2.7.0, 2.5.2, 2.6.1 > > > The `org.apache.kafka.connect.data.Values#parse` method parses integers, > which are larger than `Long.MAX_VALUE` as `double` with ` > Schema.FLOAT64_SCHEMA`. > Â > That means it loses precision for these larger integers. > For example: > {code:java} > SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808"); > {code} > returns: > {code:java} > SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18} > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10439) Connect's Values class loses precision for integers, larger than 64 bits
[ https://issues.apache.org/jira/browse/KAFKA-10439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-10439: --- Fix Version/s: 2.6.1 2.5.2 2.4.2 2.3.2 2.2.3 2.1.2 2.0.2 > Connect's Values class loses precision for integers, larger than 64 bits > > > Key: KAFKA-10439 > URL: https://issues.apache.org/jira/browse/KAFKA-10439 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Major > Fix For: 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.2, 2.7.0, 2.5.2, 2.6.1 > > > The `org.apache.kafka.connect.data.Values#parse` method parses integers, > which are larger than `Long.MAX_VALUE` as `double` with ` > Schema.FLOAT64_SCHEMA`. > Â > That means it loses precision for these larger integers. > For example: > {code:java} > SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808"); > {code} > returns: > {code:java} > SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18} > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine opened a new pull request #9379: MINOR: Annotate test BlockingConnectorTest as integration test
kkonstantine opened a new pull request #9379: URL: https://github.com/apache/kafka/pull/9379 Currently `BlockingConnectorTest` is incorrectly runs as a unit test. Categorize this test correctly as integration test by adding the appropriate annotation ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #9378: MINOR: ACLs for secured cluster system tests
rondagostino opened a new pull request #9378: URL: https://github.com/apache/kafka/pull/9378 This PR adds missing broker ACLs required to create topics and SCRAM credentials when ACLs are enabled for a system test. These ACLs were missed for system tests in the PR for KAFKA-10131 (https://github.com/apache/kafka/pull/9274/). This PR also adds support for using PLAINTEXT as the inter broker security protocol when using SCRAM from the client in a system test with a secured cluster-- without this it would always be necessary to set both the inter-broker and client mechanisms to a SCRAM mechanism. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on pull request #7929: URL: https://github.com/apache/kafka/pull/7929#issuecomment-703966954 @junrao I was able to get some more time to pick this back up, apologies for the long gap in time since my last update. In the most recent commit, I've moved away from tracking producer state snapshot files on the segment and instead maintain an in-memory cache using the ProducerStateManager. There were a few reasons for doing this. Firstly, it became a bit awkward trying to track the producer state snapshot file which we emit during clean shutdown since it does not really have an associated segment file. Second, the producer state truncation/restore logic would need to be moved into the `Log`, since complete view of all producer state snapshot files are required. This is further complicated by the way we handle corrupt snapshot files. Lastly, because we use a "fake" `ProducerStateManager` during segment recovery, we'd have to duplicate a lot of the same logic as exists today to handle truncation/loading outside of the segment lifecycle. Instead, I opted to take an approach similar to the way the `Log` manages segments, by having a `ConcurrentNavigableMap` which tracks snapshot files in memory. As a result, the logic for truncation and restore largely remains the same, but instead of scanning the log directory on every operation we query the in-memory map instead. Deletions are handled in the same way that segment deletions are handled, where the snapshot file is deleted asynchronously along with the segment file. Because we scan the logdir at startup for "stray" snapshot files, it's unnecessary to rename the snapshot files pending deletion with the `.deleted` suffix. This approach has two downsides which I think are relatively minor. 1. When a broker shuts down cleanly and emits a snapshot file, the emitted snapshot file is considered "stray" on the next broker startup. While we will clean all "stray" snapshot files except the most recent, we still keep the most recent snapshot file around until the next broker restart. This will result in a single "stray" snapshot file remaining until the next broker restart, at which point the "stray" snapshot file will be deleted. 2. Because we construct a temporary `ProducerStateManager` during segment recovery, and it may delete/create some snapshot files, we need to re-scan the log directory for snapshot files after segment loading is completed but before we load producer state. This is to ensure that the in-memory mapping for the "real" `ProducerStateManager` is up to date. Snapshot file deletion is triggered via `Log.deleteSegmentFiles` when deletion occurs due to retention. When swapping log segments into the log (like with compaction), it appears we have the additional limitation that we don't want to delete snapshot files purely based on the base offset of the deleted segment file. To handle this case, we check to see if the segment file which is being deleted due to the swap has a counterpart new segment which is being swapped in, if it does, we do not delete the snapshot file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #9320: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.
kkonstantine merged pull request #9320: URL: https://github.com/apache/kafka/pull/9320 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
hachikuji commented on a change in pull request #9280: URL: https://github.com/apache/kafka/pull/9280#discussion_r499939576 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ## @@ -2358,6 +2359,41 @@ public void testForceShutdownWithIncompleteTransaction() { } } +@Test +public void testTransactionAbortedExceptionOnAbortWithoutError() throws InterruptedException, ExecutionException { +ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); +TransactionManager txnManager = new TransactionManager(logContext, "testTransactionAbortedExceptionOnAbortWithoutError", 6, 100, apiVersions, false); + +setupWithTransactionState(txnManager, false, null); +doInitTransactions(txnManager, producerIdAndEpoch); +// Begin the transaction +txnManager.beginTransaction(); +txnManager.maybeAddPartitionToTransaction(tp0); +client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, Errors.NONE))); +// Run it once so that the partition is added to the transaction. +sender.runOnce(); +// Append a record to the accumulator. +FutureRecordMetadata metadata1 = appendToAccumulator(tp0, time.milliseconds(), "key1", "value1"); +// Now abort the transaction manually. +txnManager.beginAbort(); +// Try to send. +// This should abort the existing transaction and +// drain all the unsent batches with a TransactionAbortedException. +sender.runOnce(); +// Now attempt to fetch the result for the record. +try { +// This should fail since we aborted the transaction. +metadata1.get(); Review comment: We have a helper for this pattern. See `TestUtils.assertFutureThrows` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package
hachikuji merged pull request #9377: URL: https://github.com/apache/kafka/pull/9377 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
bbejeck commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-703947094 >Do I need to do anything on this? @badaiaqrandista, nope as soon as we can get a green build, I'll merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
badaiaqrandista commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-703939623 @bbejeck Do I need to do anything on this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
bbejeck commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-703928924 Java 11 and Java 15 passed. Java 8 failed with `org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState` known flaky test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
[ https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208393#comment-17208393 ] Bill Bejeck commented on KAFKA-10405: - Saw same error again - https://github.com/apache/kafka/pull/9099/checks?check_run_id=1210606325 > Flaky Test > org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState > --- > > Key: KAFKA-10405 > URL: https://issues.apache.org/jira/browse/KAFKA-10405 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bill Bejeck >Priority: Major > > From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/] >  > {noformat} > org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > > shouldRestoreState FAILED > 14:25:19 java.lang.AssertionError: Condition not met within timeout > 6. Repartition topic > restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged > data after 6 ms. > 14:25:19 at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > 14:25:19 at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > 14:25:19 at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > 14:25:19 at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > 14:25:19 at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > 14:25:19 at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388) > 14:25:19 at > org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on pull request #9237: URL: https://github.com/apache/kafka/pull/9237#issuecomment-703926657 Note the tests for JDK 11 passed (https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9237/12/?cloudbees-analytics-link=scm-reporting%2Fstage%2Fpending) but the job isn't exiting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on a change in pull request #9237: URL: https://github.com/apache/kafka/pull/9237#discussion_r499908753 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(value = Parameterized.class) +@Category({IntegrationTest.class}) +public class StreamTableJoinTopologyOptimizationIntegrationTest { +private static final int NUM_BROKERS = 1; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + +private String tableTopic; +private String inputTopic; +private String outputTopic; +private String applicationId; + +private Properties streamsConfiguration; + +@Rule +public TestName testName = new TestName(); + +@Parameterized.Parameter +public String topologyOptimization; + +@Parameterized.Parameters(name = "Optimization = {0}") +public static Collection topologyOptimization() { +return Arrays.asList(new String[][]{ +{StreamsConfig.OPTIMIZE}, +{StreamsConfig.NO_OPTIMIZATION} +}); +} + +@Before +public void before() throws InterruptedException { +streamsConfiguration = new Properties(); + +final String safeTestName = safeUniqueTestName(getClass(), testName); + +tableTopic = "table-topic" + safeTestName; +inputTopic = "stream-topic-" + safeTestName; +outputTopic = "output-topic-" + safeTestName; +applicationId = "app-" + safeTestName; + +CLUSTER.createTopic(inputTopic, 4, 1); +CLUSTER.createTopic(tableTopic, 2, 1); +CLUSTER.createTopic(outputTopic, 4, 1); + +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); +streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); +streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
[GitHub] [kafka] guozhangwang merged pull request #9342: MINOR: Update doc for raft state metrics
guozhangwang merged pull request #9342: URL: https://github.com/apache/kafka/pull/9342 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package
guozhangwang commented on pull request #9377: URL: https://github.com/apache/kafka/pull/9377#issuecomment-703906935 LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7421) Deadlock in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208344#comment-17208344 ] Kyle Leiby commented on KAFKA-7421: --- Hi all, we've been encountering a similar deadlock (I think the same as the one [~xakassi]Â is seeing). We are running a single Debezium JAR inside a {{confluentinc/cp-kafka-connect-base:5.5.1-1-deb8}}Â container. We tried several 5.x Debian 8 images, and encounter the deadlocks in all of them. Here's the relevant portion from an example thread dump: {code:java} Found one Java-level deadlock: = "StartAndStopExecutor-connect-1-2": waiting to lock monitor 0x7f2b68001d58 (object 0xc118c3c8, a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader), which is held by "StartAndStopExecutor-connect-1-1" "StartAndStopExecutor-connect-1-1": waiting to lock monitor 0x7f2b68001eb8 (object 0xc510, a org.apache.kafka.connect.runtime.isolation.PluginClassLoader), which is held by "StartAndStopExecutor-connect-1-2" Java stack information for the threads listed above: === "StartAndStopExecutor-connect-1-2": at java.lang.ClassLoader.loadClass(ClassLoader.java:404) - waiting to lock <0xc118c3c8> (a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:397) at java.lang.ClassLoader.loadClass(ClassLoader.java:411) - locked <0xc6a9e908> (a java.lang.Object) at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) - locked <0xc6a9e908> (a java.lang.Object) - locked <0xc510> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:719) at org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:311) at org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:215) at org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:209) at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:432) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1186) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:127) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1201) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1197) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) "StartAndStopExecutor-connect-1-1": at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91) - waiting to lock <0xc510> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:394) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:719) at org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:311) at org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:215) at org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:209) at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:251) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1229) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:127) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1245) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1241) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Thre
[GitHub] [kafka] hachikuji opened a new pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package
hachikuji opened a new pull request #9377: URL: https://github.com/apache/kafka/pull/9377 To avoid confusion since is only used by `TestRaftServer`, this PR moves `RaftRequestHandler` to the `tools` package. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9349: MINOR: add proper checks to KafkaConsumer.groupMetadata
hachikuji merged pull request #9349: URL: https://github.com/apache/kafka/pull/9349 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208316#comment-17208316 ] Sophie Blee-Goldman commented on KAFKA-5998: [~sandeep.lakdaw...@gmail.com] are you running all three instances on the same machine with a shared state directory? And/or using /tmp as the state directory? > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.intern
[GitHub] [kafka] hachikuji opened a new pull request #9376: MINOR: Remove `TargetVoters` from `DescribeQuorum`
hachikuji opened a new pull request #9376: URL: https://github.com/apache/kafka/pull/9376 This field is leftover from the early days of the KIP when it covered reassignment. Since the API is not exposed yet, should be no harm updating the first version. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499871426 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { +final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, eh); +synchronized (stateLock) { +if (state == State.CREATED) { +for (final StreamThread thread : threads) { +if (eh != null) { +thread.setStreamsUncaughtExceptionHandler(handler); +} else { +final StreamsUncaughtExceptionHandler defaultHandler = exception -> + StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD; + thread.setStreamsUncaughtExceptionHandler(defaultHandler); +} +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handleStreamsUncaughtException(final Exception e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action = streamsUncaughtExceptionHandler.handle(e); +switch (action) { +case SHUTDOWN_STREAM_THREAD: +log.error("Encountered the following exception during processing " + +"and the thread is going to shut down: ", e); +break; +case REPLACE_STREAM_THREAD: +log.error("Encountered the following exception during processing " + +"and the the stream thread will be replaced: ", e); //TODO: add then remove, wait until 663 is merged +break; +case SHUTDOWN_KAFKA_STREAMS_CLIENT: +log.error("Encountered the following exception during processing " + +"and the client is going to shut down: ", e); +for (final StreamThread streamThread: threads) { +streamThread.shutdown(); +} Review comment: I am okay with renaming, but I will wait to for everything else to be cleared up to see if it is still necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499871031 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -436,6 +496,8 @@ private void maybeSetError() { } if (setState(State.ERROR)) { +metrics.close(); Review comment: We did have it this way in the kip. If we stick to this for now I think that we can clear this up easily when we decide what we want to do with the sates in general in when we take care of the discussion in KIP-663 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499868891 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect a record and the exception received. + * @param exception the actual exception + */ +StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Exception exception); Review comment: That seems to be a fine solution. It will still attempt the shutdown but may fail. As long as we warn I guess it will work This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8353: KAFKA-9764: Remove stream simple benchmark suite
vvcephei commented on pull request #8353: URL: https://github.com/apache/kafka/pull/8353#issuecomment-703874939 Cherry-picked to 2.4 and 2.3 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik edited a comment on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik edited a comment on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-703873486 @junrao Thanks for the review! I've addressed the latest comments in e55358fd1a00f12ef98fc4d2d649a297ddf146da . The PR is ready for another pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-703873486 @junrao Thanks for the review! I've addressed the latest comments in e55358fd1a00f12ef98fc4d2d649a297ddf146da . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8353: KAFKA-9764: Remove stream simple benchmark suite
vvcephei commented on pull request #8353: URL: https://github.com/apache/kafka/pull/8353#issuecomment-703869908 cherry-picked to 2.5 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually
vvcephei commented on pull request #9262: URL: https://github.com/apache/kafka/pull/9262#issuecomment-703867931 Cherry-picked to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9323: KAFKA-10514: Fix unit test for state directory cleanup
vvcephei commented on pull request #9323: URL: https://github.com/apache/kafka/pull/9323#issuecomment-703867815 Cherry-picked to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499847021 ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -715,7 +747,58 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { doAnswer((_: InvocationOnMock) => { latch.countDown() }).doCallRealMethod().when(spyThread).awaitShutdown() -controller.shutdown() +controller.shutdown() + } + + private def testControllerFeatureZNodeSetup(initialZNode: Option[FeatureZNode], + interBrokerProtocolVersion: ApiVersion): Unit = { +val versionBeforeOpt = initialZNode match { + case Some(node) => +zkClient.createFeatureZNode(node) +Some(zkClient.getDataAndVersion(FeatureZNode.path)._2) + case None => +Option.empty +} +servers = makeServers(1, interBrokerProtocolVersion = Some(interBrokerProtocolVersion)) +TestUtils.waitUntilControllerElected(zkClient) Review comment: Done. Please take a look at the fix. I've added logic to wait for processing on a dummy event just after waiting for controller election. I'm hoping this will make sure the controller failover logic is completed before the test proceeds further to make assertions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499843780 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -616,7 +616,7 @@ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState( final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); try { -streams.setUncaughtExceptionHandler(null); + streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null); Review comment: Ah, gotcha. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499842547 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -436,6 +496,8 @@ private void maybeSetError() { } if (setState(State.ERROR)) { +metrics.close(); Review comment: It certainly might. I'm just wary of how far into the uncanny valley we're going here. Streams is going to be put into a state that's very similar to the one that `close()` produces, but not identical. What will then happen when they _do_ call close? What will happen when we realize that something else needs to be done as part of closing the instance (will we even remember that we should consider doing it here as well)? OTOH, we could instead change direction on the "error-vs-shutdown" debase and just make all these methods call `close(ZERO)` instead. Then, the _real_ close method will be invoked, and Streams will go through a well-known transition through `PENDING_SHUTDOWN` to `NOT_RUNNING`. It would then be a problem for a later date (after KIP-663) if someone wanted to request that instead the app should stop all running threads so they can manually call "addThread" later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499842547 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -436,6 +496,8 @@ private void maybeSetError() { } if (setState(State.ERROR)) { +metrics.close(); Review comment: It certainly might. I'm just wary of how far into the uncanny valley we're going here. Streams is going to be put into a state that's very similar to the one that `close()` produces, but not identical. What will then happen when they _do_ call close? OTOH, we could instead change direction on the "error-vs-shutdown" debase and just make all these methods call `close(ZERO)` instead. Then, the _real_ close method will be invoked, and Streams will go through a well-known transition through `PENDING_SHUTDOWN` to `NOT_RUNNING`. It would then be a problem for a later date (after KIP-663) if someone wanted to request that instead the app should stop all running threads so they can manually call "addThread" later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499838819 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect a record and the exception received. + * @param exception the actual exception + */ +StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Exception exception); Review comment: That's a good question. Maybe we could just document that that option is an unsuitable response to an Error, and also log an `ERROR` message if you select it in response to an Error. It's not _always_ bad to ignore an Error, but it usually is. We can leave it to users to decide what they want to do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499836489 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { Review comment: Oh, sure. Now I know why you picked this name :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10531) KafkaBasedLog can sleep for negative values
[ https://issues.apache.org/jira/browse/KAFKA-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10531: -- Fix Version/s: 2.5.2 2.7.0 > KafkaBasedLog can sleep for negative values > --- > > Key: KAFKA-10531 > URL: https://issues.apache.org/jira/browse/KAFKA-10531 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Vikas Singh >Assignee: Vikas Singh >Priority: Major > Fix For: 2.7.0, 2.5.2, 2.6.1 > > > {{time.milliseconds}} is not monotonic, so this code can throw : > {{java.lang.IllegalArgumentException: timeout value is negative}} > Â > {code:java} > long started = time.milliseconds(); > while (partitionInfos == null && time.milliseconds() - started < > CREATE_TOPIC_TIMEOUT_MS) { > partitionInfos = consumer.partitionsFor(topic); > Utils.sleep(Math.min(time.milliseconds() - started, 1000)); > } > {code} > We need to check for negative value before sleeping. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
dima5rr commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-703838454 Hi @guozhangwang can you trigger new build, looks like flaky tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499816619 ## File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala ## @@ -0,0 +1,580 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Optional, Properties} +import java.util.concurrent.ExecutionException + +import kafka.api.KAFKA_2_7_IV0 +import kafka.utils.TestUtils +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion} +import kafka.utils.TestUtils.waitUntilTrue +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult} +import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.feature.FinalizedVersionRange +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.message.UpdateFeaturesRequestData +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse} +import org.apache.kafka.common.utils.Utils +import org.junit.Test +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} +import org.scalatest.Assertions.intercept + +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag +import scala.util.matching.Regex + +class UpdateFeaturesTest extends BaseRequestTest { + + override def brokerCount = 3 + + override def brokerPropertyOverrides(props: Properties): Unit = { +props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + } + + private def defaultSupportedFeatures(): Features[SupportedVersionRange] = { +Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3 + } + + private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2 + } + + private def updateSupportedFeatures( +features: Features[SupportedVersionRange], targetServers: Set[KafkaServer]): Unit = { +targetServers.foreach(s => { + s.brokerFeatures.setSupportedFeatures(features) + s.zkClient.updateBrokerInfo(s.createBrokerInfo) +}) + +// Wait until updates to all BrokerZNode supported features propagate to the controller. +val brokerIds = targetServers.map(s => s.config.brokerId) +waitUntilTrue( + () => servers.exists(s => { +if (s.kafkaController.isActive) { + s.kafkaController.controllerContext.liveOrShuttingDownBrokers +.filter(b => brokerIds.contains(b.id)) +.forall(b => { + b.features.equals(features) +}) +} else { + false +} + }), + "Controller did not get broker updates") + } + + private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = { +updateSupportedFeatures(features, Set[KafkaServer]() ++ servers) + } + + private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = { +val server = serverForId(0).get +val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features) +val newVersion = server.zkClient.updateFeatureZNode(newNode) +servers.foreach(s => { + s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs) +}) +newVersion + } + + private def getFeatureZNode(): FeatureZNode = { +val (mayBeFeatureZNodeBytes, version) = serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path) +assertNotEquals(version, ZkVersion.UnknownVersion) +FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + } + + private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(features.asScala.map { + case(name, versionRange) => +(name, new FinalizedVersionRange(versionRange.minVersionLevel(), versionRan
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499816373 ## File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala ## @@ -0,0 +1,580 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Optional, Properties} +import java.util.concurrent.ExecutionException + +import kafka.api.KAFKA_2_7_IV0 +import kafka.utils.TestUtils +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion} +import kafka.utils.TestUtils.waitUntilTrue +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult} +import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.feature.FinalizedVersionRange +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.message.UpdateFeaturesRequestData +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse} +import org.apache.kafka.common.utils.Utils +import org.junit.Test +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} +import org.scalatest.Assertions.intercept + +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag +import scala.util.matching.Regex + +class UpdateFeaturesTest extends BaseRequestTest { + + override def brokerCount = 3 + + override def brokerPropertyOverrides(props: Properties): Unit = { +props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + } + + private def defaultSupportedFeatures(): Features[SupportedVersionRange] = { +Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3 + } + + private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2 + } + + private def updateSupportedFeatures( +features: Features[SupportedVersionRange], targetServers: Set[KafkaServer]): Unit = { +targetServers.foreach(s => { + s.brokerFeatures.setSupportedFeatures(features) + s.zkClient.updateBrokerInfo(s.createBrokerInfo) +}) + +// Wait until updates to all BrokerZNode supported features propagate to the controller. +val brokerIds = targetServers.map(s => s.config.brokerId) +waitUntilTrue( + () => servers.exists(s => { +if (s.kafkaController.isActive) { + s.kafkaController.controllerContext.liveOrShuttingDownBrokers +.filter(b => brokerIds.contains(b.id)) +.forall(b => { + b.features.equals(features) +}) +} else { + false +} + }), + "Controller did not get broker updates") + } + + private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = { +updateSupportedFeatures(features, Set[KafkaServer]() ++ servers) + } + + private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = { +val server = serverForId(0).get +val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features) +val newVersion = server.zkClient.updateFeatureZNode(newNode) +servers.foreach(s => { + s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs) +}) +newVersion + } + + private def getFeatureZNode(): FeatureZNode = { +val (mayBeFeatureZNodeBytes, version) = serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path) +assertNotEquals(version, ZkVersion.UnknownVersion) +FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + } + + private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(features.asScala.map { + case(name, versionRange) => +(name, new FinalizedVersionRange(versionRange.minVersionLevel(), versionRan
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499812076 ## File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala ## @@ -0,0 +1,580 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Optional, Properties} +import java.util.concurrent.ExecutionException + +import kafka.api.KAFKA_2_7_IV0 +import kafka.utils.TestUtils +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion} +import kafka.utils.TestUtils.waitUntilTrue +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult} +import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.feature.FinalizedVersionRange +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.message.UpdateFeaturesRequestData +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse} +import org.apache.kafka.common.utils.Utils +import org.junit.Test +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} +import org.scalatest.Assertions.intercept + +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag +import scala.util.matching.Regex + +class UpdateFeaturesTest extends BaseRequestTest { + + override def brokerCount = 3 + + override def brokerPropertyOverrides(props: Properties): Unit = { +props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + } + + private def defaultSupportedFeatures(): Features[SupportedVersionRange] = { +Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3 + } + + private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2 + } + + private def updateSupportedFeatures( +features: Features[SupportedVersionRange], targetServers: Set[KafkaServer]): Unit = { +targetServers.foreach(s => { + s.brokerFeatures.setSupportedFeatures(features) + s.zkClient.updateBrokerInfo(s.createBrokerInfo) +}) + +// Wait until updates to all BrokerZNode supported features propagate to the controller. +val brokerIds = targetServers.map(s => s.config.brokerId) +waitUntilTrue( + () => servers.exists(s => { +if (s.kafkaController.isActive) { + s.kafkaController.controllerContext.liveOrShuttingDownBrokers +.filter(b => brokerIds.contains(b.id)) +.forall(b => { + b.features.equals(features) +}) +} else { + false +} + }), + "Controller did not get broker updates") + } + + private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = { +updateSupportedFeatures(features, Set[KafkaServer]() ++ servers) + } + + private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = { +val server = serverForId(0).get +val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features) +val newVersion = server.zkClient.updateFeatureZNode(newNode) +servers.foreach(s => { + s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs) +}) +newVersion + } + + private def getFeatureZNode(): FeatureZNode = { +val (mayBeFeatureZNodeBytes, version) = serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path) +assertNotEquals(version, ZkVersion.UnknownVersion) +FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + } + + private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(features.asScala.map { + case(name, versionRange) => +(name, new FinalizedVersionRange(versionRange.minVersionLevel(), versionRan
[GitHub] [kafka] rhauch merged pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
rhauch merged pull request #9347: URL: https://github.com/apache/kafka/pull/9347 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499811752 ## File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ## @@ -185,7 +185,7 @@ class BrokerEndPointTest { "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"], "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}, "rack":"dc1", - "features": {"feature1": {"min_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "max_version": 4}} + "features": {"feature1": {"min_version": 1, "first_active_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, "max_version": 4}} Review comment: Done. Nice catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499811265 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises support for. + * Each broker advertises the version ranges of its own supported features in its own + * BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there was an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has now been upgraded to a newer version that supports the feature versioning + *system (KIP-584). But the IBP config is still set to lower than KAFKA_2_7_IV0, and may be + *set to a higher value later. In this case, we want to start with no finalized features and + *allow the user to finalize them whenever they are ready i.e. in the future whenever the + *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, then the user could start + *finalizing the features. This process ensures we do not enable all the possible features + *immediately after an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absent. + *- If the node is absent, it will react by creating a FeatureZNode with disabled status + * and empty finalized features. + *- Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + *KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + *and whether it is disabled. + * - If the node is in disabled status, the controller wonât upgrade all features immediately. + *
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499810462 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises support for. + * Each broker advertises the version ranges of its own supported features in its own + * BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there was an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has now been upgraded to a newer version that supports the feature versioning + *system (KIP-584). But the IBP config is still set to lower than KAFKA_2_7_IV0, and may be + *set to a higher value later. In this case, we want to start with no finalized features and + *allow the user to finalize them whenever they are ready i.e. in the future whenever the + *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, then the user could start + *finalizing the features. This process ensures we do not enable all the possible features + *immediately after an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absent. + *- If the node is absent, it will react by creating a FeatureZNode with disabled status + * and empty finalized features. + *- Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + *KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + *and whether it is disabled. + * - If the node is in disabled status, the controller wonât upgrade all features immediately. + *
[jira] [Assigned] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation
[ https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10559: --- Assignee: Sagar Rao > Don't shutdown the entire app upon TimeoutException during internal topic > validation > > > Key: KAFKA-10559 > URL: https://issues.apache.org/jira/browse/KAFKA-10559 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Blocker > Fix For: 2.7.0 > > > During some of the KIP-572 work, we made things pretty brittle by changing > the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` > error code and shut down the entire application if a TimeoutException is hit > during the internal topic creation/validation. > Internal topic validation occurs during every rebalance, and we have seen it > time out on topic discovery in unstable environments. So shutting down the > entire application seems like a step in the wrong direction, and antithetical > to the goal of KIP-572 (improving the resiliency of Streams in the face of > TimeoutExceptions) > I'm not totally sure what the previous behavior was, but it seems to me we > have three options: > # Rethrow the TimeoutException and allow it to kill the thread > # Swallow the TimeoutException and retry the rebalance indefinitely > # Some combination of the above: swallow the TimeoutException but don't > retry indefinitely: > ## Start a timer and allow retrying rebalances for up the configured > task.timeout.ms, the timeout config introduced in KIP-572 > ## Retry for some constant number of rebalances > I think if we go with option 3, then shutting down the entire application is > relatively more palatable, as we have given the environment a chance to > stabilize. > But, killing the thread still seems preferable, given the two new features > that are coming out soon: the ability to start up new threads, and the > improved exception handler that allows the user to choose to shut down the > entire application if that's really what they want. Once users have this > level of control over the application, we should allow them to decide how > they want to handle exceptional cases like this, rather than forcing an > option on them (eg shutdown everything) >  > Imo we should fix this before 2.7 comes out, even if it's just a partial fix > (eg we do option 1 in 2.7, but plan to implement option 3 eventually) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] piotrrzysko commented on pull request #9371: KAFKA-10510: Validate replication factor consistency on reassignment
piotrrzysko commented on pull request #9371: URL: https://github.com/apache/kafka/pull/9371#issuecomment-703823226 Hi @stanislavkozlovski, would you mind taking a look at this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation
[ https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208257#comment-17208257 ] Sophie Blee-Goldman commented on KAFKA-10559: - [~sagarrao] Yeah, go ahead! This should be a pretty small PR so it would be great if we could knock it out in the next week or two. Just ping me when it's ready. For the PR itself, I think it sounds reasonable to just rethrow the TimeoutException to kill the thread. The "add/recover stream thread" functionality will probably slip 2.7, but it'll be implemented soon. So we don't really need to go out of our way to save a single thread from death in rare circumstances imo > Don't shutdown the entire app upon TimeoutException during internal topic > validation > > > Key: KAFKA-10559 > URL: https://issues.apache.org/jira/browse/KAFKA-10559 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.7.0 > > > During some of the KIP-572 work, we made things pretty brittle by changing > the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` > error code and shut down the entire application if a TimeoutException is hit > during the internal topic creation/validation. > Internal topic validation occurs during every rebalance, and we have seen it > time out on topic discovery in unstable environments. So shutting down the > entire application seems like a step in the wrong direction, and antithetical > to the goal of KIP-572 (improving the resiliency of Streams in the face of > TimeoutExceptions) > I'm not totally sure what the previous behavior was, but it seems to me we > have three options: > # Rethrow the TimeoutException and allow it to kill the thread > # Swallow the TimeoutException and retry the rebalance indefinitely > # Some combination of the above: swallow the TimeoutException but don't > retry indefinitely: > ## Start a timer and allow retrying rebalances for up the configured > task.timeout.ms, the timeout config introduced in KIP-572 > ## Retry for some constant number of rebalances > I think if we go with option 3, then shutting down the entire application is > relatively more palatable, as we have given the environment a chance to > stabilize. > But, killing the thread still seems preferable, given the two new features > that are coming out soon: the ability to start up new threads, and the > improved exception handler that allows the user to choose to shut down the > entire application if that's really what they want. Once users have this > level of control over the application, we should allow them to decide how > they want to handle exceptional cases like this, rather than forcing an > option on them (eg shutdown everything) >  > Imo we should fix this before 2.7 comes out, even if it's just a partial fix > (eg we do option 1 in 2.7, but plan to implement option 3 eventually) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soondenana commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
soondenana commented on pull request #9347: URL: https://github.com/apache/kafka/pull/9347#issuecomment-703805880 There was an error when building `streams.examples`: ``` [2020-10-05T08:40:05.722Z] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on project standalone-pom: A Maven project already exists in the directory /home/jenkins/workspace/Kafka_kafka-pr_PR-9347/streams/quickstart/test-streams-archetype/streams.examples -> [Help 1] ``` The failure is not related to this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on pull request #9237: URL: https://github.com/apache/kafka/pull/9237#issuecomment-703803731 @lkokhreidze, thanks for the quick update. I'll make another pass soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
junrao commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499753420 ## File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ## @@ -185,7 +185,7 @@ class BrokerEndPointTest { "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"], "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}, "rack":"dc1", - "features": {"feature1": {"min_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "max_version": 4}} + "features": {"feature1": {"min_version": 1, "first_active_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, "max_version": 4}} Review comment: Should we revert the changes here? ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises support for. + * Each broker advertises the version ranges of its own supported features in its own + * BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there was an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has now been upgraded to a newer version that supports the feature versioning + *system (KIP-584). But the IBP config is still set to lower than KAFKA_2_7_IV0, and may be + *set to a higher value later. In this case, we want to start with no finalized features and + *allow the user to finalize them whenever they are ready i.e. in the future whenever the + *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, then the user could start + *finalizing the features. This process ensures we do not enable all the possible features + *immediately after an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absen
[jira] [Resolved] (KAFKA-9585) Flaky Test: LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization
[ https://issues.apache.org/jira/browse/KAFKA-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-9585. -- Resolution: Cannot Reproduce > Flaky Test: > LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization > > > Key: KAFKA-9585 > URL: https://issues.apache.org/jira/browse/KAFKA-9585 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > Failed for me locally with > {noformat} > java.lang.AssertionError: Condition not met within timeout 12. Should > obtain non-empty lag information eventually > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10555) Improve client state machine
[ https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208231#comment-17208231 ] Sophie Blee-Goldman commented on KAFKA-10555: - Just to clarify, I do agree with Matthias that we shouldn't transit to ERROR if the last stream thread is removed via the new removeStreamThread() method. I thought we were only considering to transit to ERROR if the last thread died, but to transit to NOT_RUNNING if the last thread was removed by the user. This seems consistent with the current behavior and maintains the same semantic meaning of the ERROR state, imo. I don't think we can say that "transiting to ERROR if the last thread is removed" is following the current behavior, because there is no way to remove a thread at the moment. So, we should just do what makes the most sense for this case. Personally I think that would be to transit to NOT_RUNNING, since this is not an error or exceptional case but rather a valid user action. I also agree with something that [~vvcephei]Â suggested earlier, which is that this should be part of the KIP discussion. At the very least, we should raise the final proposal on the discussion thread in case there are any objections. > Improve client state machine > > > Key: KAFKA-10555 > URL: https://issues.apache.org/jira/browse/KAFKA-10555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > The KafkaStreams client exposes its state to the user for monitoring purpose > (ie, RUNNING, REBALANCING etc). The state of the client depends on the > state(s) of the internal StreamThreads that have their own states. > Furthermore, the client state has impact on what the user can do with the > client. For example, active task can only be queried in RUNNING state and > similar. > With KIP-671 and KIP-663 we improved error handling capabilities and allow to > add/remove stream thread dynamically. We allow adding/removing threads only > in RUNNING and REBALANCING state. This puts us in a "weird" position, because > if we enter ERROR state (ie, if the last thread dies), we cannot add new > threads and longer. However, if we have multiple threads and one dies, we > don't enter ERROR state and do allow to recover the thread. > Before the KIPs the definition of ERROR state was clear, however, with both > KIPs it seem that we should revisit its semantics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10530) kafka-streams-application-reset misses some internal topics
[ https://issues.apache.org/jira/browse/KAFKA-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10530. -- Resolution: Duplicate Closing now, since this seems like a duplicate report, and visual code inspection indicates it should have been fixed. If you do still see this [~oweiler] , please feel free to re-open the ticket. > kafka-streams-application-reset misses some internal topics > --- > > Key: KAFKA-10530 > URL: https://issues.apache.org/jira/browse/KAFKA-10530 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 2.6.0 >Reporter: Oliver Weiler >Priority: Major > > While the \{{kafka-streams-application-reset}} tool works in most cases, it > misses some internal topics when using {{Foreign Key Table-Table Joins}}. > After execution, there are still two internal topics left which were not > deleted > {code} > bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic > bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer > bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic > {code} > The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires > the internal topic to end with {{-changelog}} or {{-repartition}} (which the > mentioned topics don't). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10562) KIP-478: Delegate the store wrappers to the new init method
[ https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10562: - Summary: KIP-478: Delegate the store wrappers to the new init method (was: Delegate the store wrappers to the new init method) > KIP-478: Delegate the store wrappers to the new init method > --- > > Key: KAFKA-10562 > URL: https://issues.apache.org/jira/browse/KAFKA-10562 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10546) KIP-478: Deprecate old PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10546: - Component/s: streams > KIP-478: Deprecate old PAPI > --- > > Key: KAFKA-10546 > URL: https://issues.apache.org/jira/browse/KAFKA-10546 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Can't be done until after the DSL internals are migrated. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10544) Convert KTable aggregations to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10544: - Component/s: streams > Convert KTable aggregations to new PAPI > --- > > Key: KAFKA-10544 > URL: https://issues.apache.org/jira/browse/KAFKA-10544 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10562) Delegate the store wrappers to the new init method
[ https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10562: - Component/s: streams > Delegate the store wrappers to the new init method > -- > > Key: KAFKA-10562 > URL: https://issues.apache.org/jira/browse/KAFKA-10562 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10539) Convert KStreamImpl joins to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10539: - Component/s: streams > Convert KStreamImpl joins to new PAPI > - > > Key: KAFKA-10539 > URL: https://issues.apache.org/jira/browse/KAFKA-10539 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10538) Convert KStreamImpl maps to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10538: - Component/s: streams > Convert KStreamImpl maps to new PAPI > > > Key: KAFKA-10538 > URL: https://issues.apache.org/jira/browse/KAFKA-10538 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10540) Convert KStream aggregations to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10540: - Component/s: streams > Convert KStream aggregations to new PAPI > > > Key: KAFKA-10540 > URL: https://issues.apache.org/jira/browse/KAFKA-10540 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10542) Convert KTable maps to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10542: - Component/s: streams > Convert KTable maps to new PAPI > --- > > Key: KAFKA-10542 > URL: https://issues.apache.org/jira/browse/KAFKA-10542 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10543) Convert KTable joins to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10543: - Component/s: streams > Convert KTable joins to new PAPI > > > Key: KAFKA-10543 > URL: https://issues.apache.org/jira/browse/KAFKA-10543 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10541) Convert KTable filters to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10541: - Component/s: streams > Convert KTable filters to new PAPI > -- > > Key: KAFKA-10541 > URL: https://issues.apache.org/jira/browse/KAFKA-10541 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10537) Convert KStreamImpl filters to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10537: - Component/s: streams > Convert KStreamImpl filters to new PAPI > --- > > Key: KAFKA-10537 > URL: https://issues.apache.org/jira/browse/KAFKA-10537 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10536) KIP-478: Implement KStream changes
[ https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10536: - Fix Version/s: 2.7.0 > KIP-478: Implement KStream changes > -- > > Key: KAFKA-10536 > URL: https://issues.apache.org/jira/browse/KAFKA-10536 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10536) KIP-478: Implement KStream changes
[ https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10536: - Component/s: streams > KIP-478: Implement KStream changes > -- > > Key: KAFKA-10536 > URL: https://issues.apache.org/jira/browse/KAFKA-10536 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record
[ https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10535: - Fix Version/s: 2.7.0 > KIP-478: Implement StateStoreContext and Record > --- > > Key: KAFKA-10535 > URL: https://issues.apache.org/jira/browse/KAFKA-10535 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record
[ https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10535. -- Resolution: Fixed > KIP-478: Implement StateStoreContext and Record > --- > > Key: KAFKA-10535 > URL: https://issues.apache.org/jira/browse/KAFKA-10535 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Fix Version/s: 2.7.0 > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record
[ https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10535: - Component/s: streams > KIP-478: Implement StateStoreContext and Record > --- > > Key: KAFKA-10535 > URL: https://issues.apache.org/jira/browse/KAFKA-10535 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Component/s: streams > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Component/s: (was: streams-test-utils) > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Component/s: streams-test-utils > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams, streams-test-utils >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10436) Implement KIP-478 Topology changes
[ https://issues.apache.org/jira/browse/KAFKA-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10436: - Component/s: streams > Implement KIP-478 Topology changes > -- > > Key: KAFKA-10436 > URL: https://issues.apache.org/jira/browse/KAFKA-10436 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
C0urante commented on pull request #9375: URL: https://github.com/apache/kafka/pull/9375#issuecomment-703775022 Thanks @kkonstantine, I've added a comment and addressed the Checkstyle issues. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
C0urante commented on a change in pull request #9375: URL: https://github.com/apache/kafka/pull/9375#discussion_r499754411 ## File path: connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java ## @@ -73,6 +76,15 @@ INT_LIST.add(-987654321); } +@Test(timeout = 5000) +public void shouldNotEncounterInfiniteLoop() { +byte[] bytes = new byte[] { -17, -65, -65 }; Review comment: đ good call, will add This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
C0urante commented on a change in pull request #9375: URL: https://github.com/apache/kafka/pull/9375#discussion_r499754411 ## File path: connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java ## @@ -73,6 +76,15 @@ INT_LIST.add(-987654321); } +@Test(timeout = 5000) +public void shouldNotEncounterInfiniteLoop() { +byte[] bytes = new byte[] { -17, -65, -65 }; Review comment: đ good call, will add This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
kkonstantine commented on a change in pull request #9375: URL: https://github.com/apache/kafka/pull/9375#discussion_r499750739 ## File path: connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java ## @@ -73,6 +76,15 @@ INT_LIST.add(-987654321); } +@Test(timeout = 5000) +public void shouldNotEncounterInfiniteLoop() { +byte[] bytes = new byte[] { -17, -65, -65 }; Review comment: we need a comment here to explain things. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-7334) Suggest changing config for state.dir in case of FileNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-7334: --- Labels: newbie (was: ) > Suggest changing config for state.dir in case of FileNotFoundException > -- > > Key: KAFKA-7334 > URL: https://issues.apache.org/jira/browse/KAFKA-7334 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Priority: Major > Labels: newbie > > Quoting stack trace from KAFKA-5998 : > {code} > WARN [2018-08-22 03:17:03,745] > org.apache.kafka.streams.processor.internals.ProcessorStateManager: task > [0_45] Failed to write offset checkpoint file to > /tmp/kafka-streams/ > {{ /0_45/.checkpoint: {}}} > {{ ! java.nio.file.NoSuchFileException: > /tmp/kafka-streams//0_45/.checkpoint.tmp}} > {{ ! at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}} > {{ ! at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}} > {code} > When state.dir is left at default configuration, there is a chance that > certain files under the state directory are cleaned by OS since the default > dir starts with /tmp/kafka-streams. > [~mjsax] and I proposed to suggest user, through exception message, to change > the location for state.dir . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation
[ https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208194#comment-17208194 ] Sagar Rao commented on KAFKA-10559: --- hey [~ableegoldman], I can pick this one if needed? Is there anything more that you would want to add apart ffrom the nicely worded description? > Don't shutdown the entire app upon TimeoutException during internal topic > validation > > > Key: KAFKA-10559 > URL: https://issues.apache.org/jira/browse/KAFKA-10559 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.7.0 > > > During some of the KIP-572 work, we made things pretty brittle by changing > the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` > error code and shut down the entire application if a TimeoutException is hit > during the internal topic creation/validation. > Internal topic validation occurs during every rebalance, and we have seen it > time out on topic discovery in unstable environments. So shutting down the > entire application seems like a step in the wrong direction, and antithetical > to the goal of KIP-572 (improving the resiliency of Streams in the face of > TimeoutExceptions) > I'm not totally sure what the previous behavior was, but it seems to me we > have three options: > # Rethrow the TimeoutException and allow it to kill the thread > # Swallow the TimeoutException and retry the rebalance indefinitely > # Some combination of the above: swallow the TimeoutException but don't > retry indefinitely: > ## Start a timer and allow retrying rebalances for up the configured > task.timeout.ms, the timeout config introduced in KIP-572 > ## Retry for some constant number of rebalances > I think if we go with option 3, then shutting down the entire application is > relatively more palatable, as we have given the environment a chance to > stabilize. > But, killing the thread still seems preferable, given the two new features > that are coming out soon: the ability to start up new threads, and the > improved exception handler that allows the user to choose to shut down the > entire application if that's really what they want. Once users have this > level of control over the application, we should allow them to decide how > they want to handle exceptional cases like this, rather than forcing an > option on them (eg shutdown everything) >  > Imo we should fix this before 2.7 comes out, even if it's just a partial fix > (eg we do option 1 in 2.7, but plan to implement option 3 eventually) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
lkokhreidze commented on a change in pull request #9237: URL: https://github.com/apache/kafka/pull/9237#discussion_r485886423 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java ## @@ -0,0 +1,242 @@ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(value = Parameterized.class) +@Category({IntegrationTest.class}) +public class StreamTableJoinTopologyOptimizationIntegrationTest { Review comment: There's already another `StreamTableIntegrationTest` present, but it works with `TopologyTestDriver` so I thought it would be better and easier to keep them separate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org