[GitHub] [kafka] showuon commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys
showuon commented on pull request #11800: URL: https://github.com/apache/kafka/pull/11800#issuecomment-1051784959 > unknownKeys contains neither usedKeys nor unusedKeys. > Only configKeys that are not officially recognized by kafka will be put into unknownKeys. Yes, this is what I expected. Thanks. But that also confuses me: if `unknown` keys doesn't contain `used` and `unused`, why this test will fail after we assert `unknown`? > However, after valuesWithPrefixOverride.get("sasl.mechanism") add assertFalse(config.unknown().contains("prefix.sasl.mechanism"));, testCase will fail to verify; because unknownKeys is only affected by originals and values, the value is originalKeys.removeAll(valueKeys). In the above test case, we originally expect that `prefix.sasl.mechanism` is `used` and is not in `unused` keys. After this PR, we should expected that `prefix.sasl.mechanism` is `used`, and not in `unused` and **NOT** in `unknown` keys, right? Did I miss anything here? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13694) Some InvalidRecordException messages are thrown away
[ https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498411#comment-17498411 ] RivenSun commented on KAFKA-13694: -- Hi [~guozhang] I looked at the code on the KafkaProducer side again. In the Sender#failBatch(...) method, when returning data to the user's callback, we only return the *PartitionResponse.error* field to the user, and do not return the *PartitionResponse.recordErrors* field to the user. 1. The PartitionResponse.errorMessage field cannot solve the current problem. Sometimes the errorMessage field will not have a value, such as a RecordTooLargeException case; Sometimes its information doesn't tell the user why the send failed, such as in this JIRA example, PartitionResponse.errorMessage=rve.invalidException.getMessage. 2. We need to fill the PartitionResponse.recordErrors field information into the "Exception exception" in the user callback, and may need to add a toString method to the ProduceResponse.RecordError class. In fact, as I said in JIRA, PartitionResponse#toString method has called RecordError#toString, but before, RecordError lacks toString method. What do you think? Thanks. > Some InvalidRecordException messages are thrown away > > > Key: KAFKA-13694 > URL: https://issues.apache.org/jira/browse/KAFKA-13694 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 3.0.0 >Reporter: RivenSun >Priority: Major > > 1.Example > Topic level config:"cleanup.policy":"compact" > But when the producer sends the message, the ProducerRecord does not specify > the key. > > producer.log > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One > or more records have been rejected {code} > > > server.log > {code:java} > [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing > append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: One or more records have been > rejected {code} > Through the logs of the producer and server, we do not know the reason for > the failure of sending, only that the message was rejected by the server. > You can compare the RecordTooLargeException testCase, we can clearly know the > reason for the failure from the producer, and the server will not print the > log (the reason will be explained later) > producer_message_too_large.log : > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The > request included a message larger than the max message size the server will > accept. > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The > request included a message larger than the max message size the server will > accept. {code} > 2.RootCause > ReplicaManager#appendToLocalLog(...) -> > Partition#appendRecordsToLeader(...) -> > UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) -> > LogValidator#validateMessagesAndAssignOffsets(...) > 1) Analyze the validateMessagesAndAssignOffsets method, > In the LogValidator#validateRecord method, validateKey and validateTimestamp > are called, and the error information of all messages is obtained: > Seq[ApiRecordError]; > In the subsequent processRecordErrors(recordErrors) method, currently only > special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR > returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the > code will run to > {code:java} > else { > throw new RecordValidationException(new InvalidRecordException( > "One or more records have been rejected"), errors) > }{code} > In fact, the *errors* variable here contains the specific information of each > recordError, but we did not put the errors information into the message of > InvalidRecordException. > 2).The exception thrown by processRecordErrors will be caught by > ReplicaManager#appendToLocalLog(...), we continue to analyze the > `catchException code` of appendToLocalLog. > Here, we can know the RecordTooLargeException, why the server does not print > the log. > Under case rve: RecordValidationException, > The server prints the log: processFailedRecord method, > and sends a response to the client: LogAppendResult method > In these two methods, we can find that we only use rve.invalidException, > For rve.recordErrors, the server neither prints it nor returns it to the > client. > 3.Solution > Two solutions, I prefer the second > 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns > Errors.INVALID_RECORD_WITHOUT_KEY, > In the processRecordErrors
[GitHub] [kafka] RivenSun2 commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys
RivenSun2 commented on pull request #11800: URL: https://github.com/apache/kafka/pull/11800#issuecomment-1051735512 Hi @showuon Currently the logic of the `unknown()` method is exactly what you would expect. `unknownKeys=originalKeys.removeAll(valueKeys)` `unknownKeys` contains neither `usedKeys` nor `unusedKeys`. Only configKeys that are not officially recognized by kafka will be put into `unknownKeys`. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11813: KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED
ableegoldman commented on a change in pull request #11813: URL: https://github.com/apache/kafka/pull/11813#discussion_r815270894 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -230,53 +233,72 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); -if (resetOffsets) { +if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) { log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", topologyToRemove, partitionsToReset ); -if (!partitionsToReset.isEmpty()) { Review comment: The offset reset code is pretty long so I pulled it out into its own method to clean things up a bit ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -230,53 +233,72 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); -if (resetOffsets) { +if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) { log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", topologyToRemove, partitionsToReset ); -if (!partitionsToReset.isEmpty()) { -removeTopologyFuture.whenComplete((v, throwable) -> { -if (throwable != null) { -removeTopologyFuture.completeExceptionally(throwable); -} -DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; -while (deleteOffsetsResult == null) { -try { -deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); -deleteOffsetsResult.all().get(); -} catch (final InterruptedException ex) { +resetOffsets(removeTopologyFuture, partitionsToReset); +} +return new RemoveNamedTopologyResult(removeTopologyFuture); +} + +/** + * @return true iff the application is still in CREATED and the future was completed + */ +private synchronized boolean completedFutureForUnstartedApp(final KafkaFutureImpl updateTopologyFuture, Review comment: This is the main fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11813: KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED
ableegoldman commented on a change in pull request #11813: URL: https://github.com/apache/kafka/pull/11813#discussion_r815270807 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -100,7 +100,7 @@ public void start(final NamedTopology initialTopology) { /** * Start up Streams with a collection of initial NamedTopologies (may be empty) */ -public void start(final Collection initialTopologies) { +public synchronized void start(final Collection initialTopologies) { Review comment: `super.start()` is already synchronized but we should just go ahead and synchronize at the first layer ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -145,7 +145,7 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * @return the NamedTopology for the specific name, or Optional.empty() if the application has no NamedTopology of that name */ -public Optional getTopologyByName(final String name) { +public synchronized Optional getTopologyByName(final String name) { Review comment: Should make sure this is thread safe since it's how we check to make sure a name isn't already used when trying to add a new topology -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #11813: KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED
ableegoldman opened a new pull request #11813: URL: https://github.com/apache/kafka/pull/11813 Currently the #add/removeNamedTopology APIs behave a little wonky when the application is still in CREATED. Since adding and removing topologies runs some validation steps there is valid reason to want to add or remove a topology on a dummy app that you don't plan to start, or a real app that you haven't started yet. But to actually check the results of the validation you need to call `get()` on the future, so we need to make sure that `get()` won't block forever in the case of no failure -- as is currently the case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] justinrlee commented on pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured
justinrlee commented on pull request #11811: URL: https://github.com/apache/kafka/pull/11811#issuecomment-1051595309 (Also, I'm not sure if we'd want a similar PR to the 3.1 branch) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff
ableegoldman commented on a change in pull request #11787: URL: https://github.com/apache/kafka/pull/11787#discussion_r815264270 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; + +import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; + +/** + * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is + * shared between all StreamThreads. + */ +public class TaskExecutionMetadata { +private final boolean hasNamedTopologies; +// map of topologies experiencing errors/currently under backoff +private final ConcurrentHashMap topologyNameToErrorMetadata = new ConcurrentHashMap<>(); + +public TaskExecutionMetadata(final Set allTopologyNames) { +this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY)); +} + +public boolean canProcessTask(final Task task, final long now) { +final String topologyName = task.id().topologyName(); +if (!hasNamedTopologies) { +// TODO implement error handling/backoff for non-named topologies (needs KIP) +return true; +} else { +final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName); +return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now)); +} +} + +public void registerTaskError(final Task task, final Throwable t, final long now) { +if (hasNamedTopologies) { +final String topologyName = task.id().topologyName(); +topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName)) +.registerTaskError(task, t, now); +} +} + +class NamedTopologyMetadata { +private final Logger log; +private final Map tasksToErrorTime = new ConcurrentHashMap<>(); + +public NamedTopologyMetadata(final String topologyName) { +final LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName)); +this.log = logContext.logger(NamedTopologyMetadata.class); +} + +public boolean canProcess() { +// TODO: during long task backoffs, pause the full topology to avoid it getting out of sync +return true; +} + +public boolean canProcessTask(final Task task, final long now) { +// TODO: implement exponential backoff, for now we just wait 15s +final Long errorTime = tasksToErrorTime.get(task.id()); +if (errorTime == null) { +return true; +} else if (now - errorTime > 15000L) { Review comment: Because it was actually taking the thread 10s to come back up (in the integration test where we overrode `session.timeout` to 10s) before we had https://github.com/apache/kafka/pull/11801 Now with that fix it takes .5 - 4s for the thread to be replaced, so there's no particular reason to have it be 15s. I think it makes sense to lower it to maybe 5s for now, and then when we have the true exponential backoff obviously it can start lower and grow from there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #11812: KAFKA-12738: address minor followup and consolidate integration tests of PR #11787
ableegoldman opened a new pull request #11812: URL: https://github.com/apache/kafka/pull/11812 This PR addresses the remaining nits from the final review of https://github.com/apache/kafka/pull/11787 It also deletes two integration test classes which had only one test in them, and moves the tests to another test class file to save on the time to bring up an entire embedded kafka cluster just for a single run -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff
ableegoldman commented on a change in pull request #11787: URL: https://github.com/apache/kafka/pull/11787#discussion_r815265785 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category(IntegrationTest.class) +public class ErrorHandlingIntegrationTest { + +private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + +@Rule +public TestName testName = new TestName(); + +private final String testId = safeUniqueTestName(getClass(), testName); +private final String appId = "appId_" + testId; +private final Properties properties = props(); + +// Task 0 +private final String inputTopic = "input" + testId; +private final String outputTopic = "output" + testId; +// Task 1 +private final String errorInputTopic = "error-input" + testId; +private final String errorOutputTopic = "error-output" + testId; + +@Before +public void setup() { +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic); +} + +private Properties props() { +return mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()), +mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), +mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L), +mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), +mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), +mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1), +mkEntry(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1) +) +); +} + +@Test +public void shouldBackOffTaskAndEmitDataWithinSameTopology()
[GitHub] [kafka] justinrlee opened a new pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured
justinrlee opened a new pull request #11811: URL: https://github.com/apache/kafka/pull/11811 Single-line change to `build.gradle` to render javadocs for new `org.apache.kafka.common.security.oauthbearer.secured` package (part of [KIP-768](https://issues.apache.org/jira/browse/KAFKA-13202)) cc @junrao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff
ableegoldman commented on a change in pull request #11787: URL: https://github.com/apache/kafka/pull/11787#discussion_r815264270 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; + +import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; + +/** + * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is + * shared between all StreamThreads. + */ +public class TaskExecutionMetadata { +private final boolean hasNamedTopologies; +// map of topologies experiencing errors/currently under backoff +private final ConcurrentHashMap topologyNameToErrorMetadata = new ConcurrentHashMap<>(); + +public TaskExecutionMetadata(final Set allTopologyNames) { +this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY)); +} + +public boolean canProcessTask(final Task task, final long now) { +final String topologyName = task.id().topologyName(); +if (!hasNamedTopologies) { +// TODO implement error handling/backoff for non-named topologies (needs KIP) +return true; +} else { +final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName); +return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now)); +} +} + +public void registerTaskError(final Task task, final Throwable t, final long now) { +if (hasNamedTopologies) { +final String topologyName = task.id().topologyName(); +topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName)) +.registerTaskError(task, t, now); +} +} + +class NamedTopologyMetadata { +private final Logger log; +private final Map tasksToErrorTime = new ConcurrentHashMap<>(); + +public NamedTopologyMetadata(final String topologyName) { +final LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName)); +this.log = logContext.logger(NamedTopologyMetadata.class); +} + +public boolean canProcess() { +// TODO: during long task backoffs, pause the full topology to avoid it getting out of sync +return true; +} + +public boolean canProcessTask(final Task task, final long now) { +// TODO: implement exponential backoff, for now we just wait 15s +final Long errorTime = tasksToErrorTime.get(task.id()); +if (errorTime == null) { +return true; +} else if (now - errorTime > 15000L) { Review comment: Because it was actually taking the thread 10s to come back up (in the integration test where we overrode `session.timeout` to 10s) before we had https://github.com/apache/kafka/pull/11801 Now with that fix it takes under .5s for the thread to be replaced, so there's no particular reason to have it be 15s. I think it makes sense to lower it to maybe 5s for now, and then when we have the true exponential backoff obviously it can start lower and grow from there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys
showuon commented on pull request #11800: URL: https://github.com/apache/kafka/pull/11800#issuecomment-1051554030 > However, after valuesWithPrefixOverride.get("sasl.mechanism") add assertFalse(config.unknown().contains("prefix.sasl.mechanism"));, testCase will fail to verify; because unknownKeys is only affected by originals and values, the value is originalKeys.removeAll(valueKeys). So, could we also remove `used` config keys for `unknown` configs? Otherwise, there would be this strange case that a config is `used`, but `unknown` to kafka. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13514) Flakey test StickyAssignorTest
[ https://issues.apache.org/jira/browse/KAFKA-13514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498387#comment-17498387 ] Luke Chen edited comment on KAFKA-13514 at 2/26/22, 3:38 AM: - [~ableegoldman] , I agree with you that if this actually result in the two consumers taking ownership of the same partition at the same time, that would be a severe bug, and we should fix it soon! I have this assumption is because log output in this Jira ticket's description. However, I also saw this failed test in jenkins after this ticket opened, but no similar error logs as this ticket showed. It's just slow. I'm stilling waiting to see a failed case and check the complete logs where there is assignment error. > I'm pretty sure we haven't touched the general assignment algorithm Right, we didn't touch or change the general assignment algorithm. So this bug is already there when first written. Do you have the failed test jenkins link for me to check the complete log? Thank you. was (Author: showuon): [~ableegoldman] , I agree with you that if this actually result in the two consumers taking ownership of the same partition at the same time, that would be a severe bug, and we should fix it soon! I have this assumption is because log output in this Jira ticket's description. However, I also saw this error after this ticket opened, and no similar error logs as this ticket showed. It's just slow. I'm stilling waiting to see a failed case and check the complete logs. > I'm pretty sure we haven't touched the general assignment algorithm Right, we didn't touch or change the general assignment algorithm. So this bug is already there when first written. Do you have the failed test jenkins link for me to check the complete log? Thank you. > Flakey test StickyAssignorTest > -- > > Key: KAFKA-13514 > URL: https://issues.apache.org/jira/browse/KAFKA-13514 > Project: Kafka > Issue Type: Test > Components: clients, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription() > No real stack trace, but only: > {quote}java.util.concurrent.TimeoutException: > testLargeAssignmentAndGroupWithNonEqualSubscription() timed out after 60 > seconds{quote} > STDOUT > {quote}[2021-12-07 01:32:23,920] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) > [2021-12-07 01:32:58,964] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) > [2021-12-07 01:32:58,976] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150){quote} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13514) Flakey test StickyAssignorTest
[ https://issues.apache.org/jira/browse/KAFKA-13514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498387#comment-17498387 ] Luke Chen commented on KAFKA-13514: --- [~ableegoldman] , I agree with you that if this actually result in the two consumers taking ownership of the same partition at the same time, that would be a severe bug, and we should fix it soon! I have this assumption is because log output in this Jira ticket's description. However, I also saw this error after this ticket opened, and no similar error logs as this ticket showed. It's just slow. I'm stilling waiting to see a failed case and check the complete logs. > I'm pretty sure we haven't touched the general assignment algorithm Right, we didn't touch or change the general assignment algorithm. So this bug is already there when first written. Do you have the failed test jenkins link for me to check the complete log? Thank you. > Flakey test StickyAssignorTest > -- > > Key: KAFKA-13514 > URL: https://issues.apache.org/jira/browse/KAFKA-13514 > Project: Kafka > Issue Type: Test > Components: clients, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription() > No real stack trace, but only: > {quote}java.util.concurrent.TimeoutException: > testLargeAssignmentAndGroupWithNonEqualSubscription() timed out after 60 > seconds{quote} > STDOUT > {quote}[2021-12-07 01:32:23,920] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) > [2021-12-07 01:32:58,964] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) > [2021-12-07 01:32:58,976] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150){quote} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on a change in pull request #11788: KAFKA-13673: disable idempotence when config conflicts
showuon commented on a change in pull request #11788: URL: https://github.com/apache/kafka/pull/11788#discussion_r815260473 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -205,25 +208,31 @@ /** metric.reporters */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; +// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering +private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5; + /** max.in.flight.requests.per.connection */ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of" -+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."; -// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering -private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5; ++ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)." ++ " Note additionally that enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." ++ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; /** retries */ public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the error." -+ " Allowing retries without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" ++ " Allowing retries and disabling enable.idempotence but without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" Review comment: side fix: currently, we said: ``` Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records ``` which is not quite right, because we didn't have the idempotence before. Updated to: ``` Allowing retries and disabling enable.idempotence but without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11788: KAFKA-13673: disable idempotence when config conflicts
showuon commented on a change in pull request #11788: URL: https://github.com/apache/kafka/pull/11788#discussion_r815260093 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -461,27 +467,53 @@ private void postProcessAndValidateIdempotenceConfigs(final Map final Map originalConfigs = this.originals(); final String acksStr = parseAcks(this.getString(ACKS_CONFIG)); configs.put(ACKS_CONFIG, acksStr); - -// For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation -if (idempotenceEnabled()) { -boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG); -if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) { -throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer."); +final boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); +boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); +boolean shouldDisableIdempotence = false; + +// For idempotence producers, values for `retries` and `acks` and `max.in.flight.requests.per.connection` need validation +if (idempotenceEnabled) { +final int retries = this.getInt(RETRIES_CONFIG); +if (retries == 0) { +if (userConfiguredIdempotence) { +throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer."); +} +log.info("Idempotence will be disabled because {} is set to 0.", RETRIES_CONFIG, retries); +shouldDisableIdempotence = true; } -boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG); final short acks = Short.valueOf(acksStr); -if (userConfiguredAcks && acks != (short) -1) { -throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + +if (acks != (short) -1) { +if (userConfiguredIdempotence) { +throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + "producer. Otherwise we cannot guarantee idempotence."); +} +log.info("Idempotence will be disabled because {} is set to {}, not set to 'all'.", ACKS_CONFIG, acks); +shouldDisableIdempotence = true; } -boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); -if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { -throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + +final int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); +if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) { +if (userConfiguredIdempotence) { +throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + " to use the idempotent producer."); +} +log.warn("Idempotence will be disabled because {} is set to {}, which is greater than 5. " + +"Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection); +shouldDisableIdempotence = true; } } + +if (shouldDisableIdempotence) { +configs.put(ENABLE_IDEMPOTENCE_CONFIG, false); +} + +// validate `transaction.id` after validating idempotence dependant configs because `enable.idempotence` config might be overridden +idempotenceEnabled = idempotenceEnabled && !shouldDisableIdempotence; Review comment: Good suggestion! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11810: KAFKA-13281: can remove topologies while in a created state
ableegoldman commented on pull request #11810: URL: https://github.com/apache/kafka/pull/11810#issuecomment-1051526955 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11810: KAFKA-13281: can remove topologies while in a created state
ableegoldman merged pull request #11810: URL: https://github.com/apache/kafka/pull/11810 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11810: KAFKA-13281: can remove topologies while in a created state
ableegoldman commented on pull request #11810: URL: https://github.com/apache/kafka/pull/11810#issuecomment-1051525618 All test failures are unrelated, merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13514) Flakey test StickyAssignorTest
[ https://issues.apache.org/jira/browse/KAFKA-13514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498386#comment-17498386 ] A. Sophie Blee-Goldman commented on KAFKA-13514: Hey [~showuon] just checking in since I saw this fail again, you're saying there's a bug in the generalAssign case that's causing it to assign the same partition to more than one consumer? If so, what's happening in this case – do we actually catch this bug somewhere in the assignor or coordinator, or would this actually result in the two consumers taking ownership of the same partition at the same time? If the latter, that's a pretty severe bug and we should absolutely be trying to get a fix in possibly as a blocker for the next release (of course AFAICT the general case where consumers have unequal subscriptions is extremely rare, but it's still a correctness bug and thus critical). Do you have any sense of whether this was a newly introduced bug or has it been there since the sticky assignment algorithm was first written? I'm pretty sure we haven't touched the general assignment algorithm so my guess it was always there, and thus maybe not a blocker, but just wondering > Flakey test StickyAssignorTest > -- > > Key: KAFKA-13514 > URL: https://issues.apache.org/jira/browse/KAFKA-13514 > Project: Kafka > Issue Type: Test > Components: clients, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription() > No real stack trace, but only: > {quote}java.util.concurrent.TimeoutException: > testLargeAssignmentAndGroupWithNonEqualSubscription() timed out after 60 > seconds{quote} > STDOUT > {quote}[2021-12-07 01:32:23,920] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) > [2021-12-07 01:32:58,964] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) > [2021-12-07 01:32:58,976] ERROR Found multiple consumers consumer1 and > consumer2 claiming the same TopicPartition topic-0 in the same generation -1, > this will be invalidated and removed from their previous assignment. > (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150){quote} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ableegoldman merged pull request #11808: KAFKA-13281: list all named topologies
ableegoldman merged pull request #11808: URL: https://github.com/apache/kafka/pull/11808 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on pull request #11804: KAFKA-13542: add rebalance reason in Kafka Streams
lihaosky commented on pull request #11804: URL: https://github.com/apache/kafka/pull/11804#issuecomment-1051403295 Hi @showuon , I saw lots of checks failed here and in other merged PRs. Is `tests/Build/ARM` the only relevant test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13694) Some InvalidRecordException messages are thrown away
[ https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498369#comment-17498369 ] RivenSun commented on KAFKA-13694: -- Hi @guozhang wang, I went through the code again. I agree with you that recordErrors are already stored in the ProducerResponse. It is very likely that when the KafkaProducer side processes the producerResponse, it does not return recordErrors to the user's Callback. I went out today, and I will analyze the reason further later. thanks. > Some InvalidRecordException messages are thrown away > > > Key: KAFKA-13694 > URL: https://issues.apache.org/jira/browse/KAFKA-13694 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 3.0.0 >Reporter: RivenSun >Priority: Major > > 1.Example > Topic level config:"cleanup.policy":"compact" > But when the producer sends the message, the ProducerRecord does not specify > the key. > > producer.log > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One > or more records have been rejected {code} > > > server.log > {code:java} > [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing > append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: One or more records have been > rejected {code} > Through the logs of the producer and server, we do not know the reason for > the failure of sending, only that the message was rejected by the server. > You can compare the RecordTooLargeException testCase, we can clearly know the > reason for the failure from the producer, and the server will not print the > log (the reason will be explained later) > producer_message_too_large.log : > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The > request included a message larger than the max message size the server will > accept. > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The > request included a message larger than the max message size the server will > accept. {code} > 2.RootCause > ReplicaManager#appendToLocalLog(...) -> > Partition#appendRecordsToLeader(...) -> > UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) -> > LogValidator#validateMessagesAndAssignOffsets(...) > 1) Analyze the validateMessagesAndAssignOffsets method, > In the LogValidator#validateRecord method, validateKey and validateTimestamp > are called, and the error information of all messages is obtained: > Seq[ApiRecordError]; > In the subsequent processRecordErrors(recordErrors) method, currently only > special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR > returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the > code will run to > {code:java} > else { > throw new RecordValidationException(new InvalidRecordException( > "One or more records have been rejected"), errors) > }{code} > In fact, the *errors* variable here contains the specific information of each > recordError, but we did not put the errors information into the message of > InvalidRecordException. > 2).The exception thrown by processRecordErrors will be caught by > ReplicaManager#appendToLocalLog(...), we continue to analyze the > `catchException code` of appendToLocalLog. > Here, we can know the RecordTooLargeException, why the server does not print > the log. > Under case rve: RecordValidationException, > The server prints the log: processFailedRecord method, > and sends a response to the client: LogAppendResult method > In these two methods, we can find that we only use rve.invalidException, > For rve.recordErrors, the server neither prints it nor returns it to the > client. > 3.Solution > Two solutions, I prefer the second > 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns > Errors.INVALID_RECORD_WITHOUT_KEY, > In the processRecordErrors method, also do special processing for > Errors.INVALID_RECORD_WITHOUT_KEY > 2)Modify the logic of the processRecordErrors method, no longer distinguish > the types of Errors, and {*}Even if new INVALID_RECORD types will be added in > the future{*}, we uniformly return: > {code:java} > throw new RecordValidationException(new InvalidRecordException( > "One or more records have been rejected due to " + errors.toString()), > errors) {code} > Also need to add toString() method for ProduceResponse.RecordError class > {code:java} > @Override > public String toString() { > return "RecordError(" > + "batchIndex=" + batchIndex > + ",
[GitHub] [kafka] wcarlson5 opened a new pull request #11810: KAFKA-13281: can remove topologies while in a created state
wcarlson5 opened a new pull request #11810: URL: https://github.com/apache/kafka/pull/11810 We should be able to change the topologies in a created state. This should include removing them. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13694) Some InvalidRecordException messages are thrown away
[ https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498364#comment-17498364 ] RivenSun commented on KAFKA-13694: -- Hi [~guozhang] Thank you for your reply. The KafkaProducer code I tested locally is as follows: {code:java} ProducerRecord record = new ProducerRecord("rivenTest4", System.currentTimeMillis() + value); Callback callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) LOG.error("the producer has a error:" + e.getMessage()); else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); System.out.println("The partition of the record we just sent is: " + metadata.partition()); } } }; producer.send(record, callback); {code} The kafkaProducer does not know the specific reason for the failure to send. producer.log {code:java} [kafka-producer-network-thread | producer-1] ERROR us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One or more records have been rejected {code} {code:java} {code} You can read the source code of the catchException at the end of the ReplicaManager#appendToLocalLog(...) method {code:java} catch { // NOTE: Failed produce requests metric is not incremented for known exceptions // it is supposed to indicate un-expected failures of a broker in handling a produce request case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderOrFollowerException | _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | _: KafkaStorageException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case rve: RecordValidationException => val logStartOffset = processFailedRecord(topicPartition, rve.invalidException) val recordErrors = rve.recordErrors (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo( logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException))) case t: Throwable => val logStartOffset = processFailedRecord(topicPartition, t) (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t))) } {code} In case rve: RecordValidationException, the server only uses rve.invalidException when printing the log and *filling the exception* field in LogAppendResult. I understand that rve.recordErrors is indeed used when constructing the LogAppendInfo field of LogAppendResult, but KafkaClient does not perceive the recordErrors of LogAppendInfo. You can refer to the following code {code:java} case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderOrFollowerException | _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | _: KafkaStorageException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) {code} In this code, the server does not pass in the recordErrors parameter when constructing LogAppendInfo.UnknownLogAppendInfo, but the producer can still know the specific failure reason. producer2.log {code:java} [kafka-producer-network-thread | producer-1] ERROR us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The request included a message larger than the max message size the server will accept. {code} > Some InvalidRecordException messages are thrown away > > > Key: KAFKA-13694 > URL: https://issues.apache.org/jira/browse/KAFKA-13694 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 3.0.0 >Reporter: RivenSun >Priority: Major > > 1.Example > Topic level config:"cleanup.policy":"compact" > But when the producer sends the message, the ProducerRecord does not specify > the key. > > producer.log > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One > or more records have been rejected {code} > > > server.log > {code:java} > [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing > append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: One or more records have been > rejected {code} > Through the logs of the producer and server, we do not know the reason for > the failure of sending, only that the message was rejected by the server. > You can compare the RecordTooLargeException testCase, we can clearly know the > reason for the failure from the producer, and the server will not print
[GitHub] [kafka] wcarlson5 commented on a change in pull request #11808: KAFKA-13281: list all named topologies
wcarlson5 commented on a change in pull request #11808: URL: https://github.com/apache/kafka/pull/11808#discussion_r815227626 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -149,6 +149,10 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { return Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology); } +public Collection getAllTopologies() { +return getAllTopologies(); Review comment: oops. I though I was calling that on topology metadata. Fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL
cmccabe merged pull request #11806: URL: https://github.com/apache/kafka/pull/11806 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #11809: MINOR: create KafkaConfigSchema and TimelineObject
cmccabe opened a new pull request #11809: URL: https://github.com/apache/kafka/pull/11809 Create KafkaConfigSchema to encapsulate the concept of determining the types of configuration keys. This is useful in the controller because we can't import KafkaConfig, which is part of core. Also introduce the TimelineObject class, which is a more generic version of TimelineInteger / TimelineLong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11808: KAFKA-13281: list all named topologies
ableegoldman commented on a change in pull request #11808: URL: https://github.com/apache/kafka/pull/11808#discussion_r815205312 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -149,6 +149,10 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { return Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology); } +public Collection getAllTopologies() { +return getAllTopologies(); Review comment: this looks like an infinite loop? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11807: KAFKA-13698; KRaft authorizer should use host address instead of name
cmccabe commented on pull request #11807: URL: https://github.com/apache/kafka/pull/11807#issuecomment-1051328712 @hachikuji : looks like `AuthorizerIntegrationTest#testHostAddressBasedAcls` needs to be fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11807: KAFKA-13698; KRaft authorizer should use host address instead of name
cmccabe commented on pull request #11807: URL: https://github.com/apache/kafka/pull/11807#issuecomment-1051328911 ``` [2022-02-25T22:07:29.188Z] AuthorizerIntegrationTest > testHostAddressBasedAcls(String) > kafka.api.AuthorizerIntegrationTest.testHostAddressBasedAcls(String)[2] FAILED [2022-02-25T22:07:29.188Z] org.opentest4j.AssertionFailedError: expected: but was: [2022-02-25T22:07:29.188Z] at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) [2022-02-25T22:07:29.188Z] at app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) [2022-02-25T22:07:29.188Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) [2022-02-25T22:07:29.188Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) [2022-02-25T22:07:29.188Z] at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) [2022-02-25T22:07:29.188Z] at app//kafka.api.AuthorizerIntegrationTest.testHostAddressBasedAcls(AuthorizerIntegrationTest.scala:2434) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #11808: KAFKA-13281: list all named topologies
wcarlson5 opened a new pull request #11808: URL: https://github.com/apache/kafka/pull/11808 List all the named topologies that have been added to this client ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
philipnee commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815182457 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.concurrent.Callable; + +@RunWith(PowerMockRunner.class) +public class RetryUtilTest { + +@Mock Review comment: oh good call - the mock exist because I was doing something else. Deleting this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #11807: KAFKA-13698; KRaft authorizer should use host address instead of name
hachikuji commented on pull request #11807: URL: https://github.com/apache/kafka/pull/11807#issuecomment-1051297524 New integration test is not working on jenkins. I was afraid of that. I'll have to see if there's a better approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
rhauch commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815174043 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { +private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + +/** + * The method executes the callable at least once, optionally retrying the callable if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If all retries are exhausted, + * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown. + * + * If maxRetries is set to 0, the task will be + * executed exactly once. If maxRetries is set to n, the callable will be executed at + * most n + 1 times. + * + * If retryBackoffMs is set to 0, no wait will happen in between the retries. + * + * @param callable the function to execute. + * @param maxRetries maximum number of retries; must be 0 or more + * @param retryBackoffMs the number of milliseconds to delay upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; must be 0 or more + * Review comment: Nit on the spacing so the description of parameters is column-aligned. ```suggestion * @param callable the function to execute. * @param maxRetries maximum number of retries; must be 0 or more * @param retryBackoffMs the number of milliseconds to delay upon receiving a * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; * must be 0 or more ``` ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.concurrent.Callable; + +@RunWith(PowerMockRunner.class) +public class RetryUtilTest { + +@Mock Review comment: Do we need this line? I think not since you're instantiating `mockCallable` in `setUp()`. The annotation is really only needed when the Mockito JUnit runner is used to initialize the mock fields. Also, IIRC we get rid of the `@RunWith(PowerMockRunner.class)` line as well, since this code is not really using anything from PowerMock. It'd be great if we could avoid using PowerMock in new code. -- This is an automated message from the Apache Git
[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
rhauch commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815158301 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { +private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + +/** + * The method executes the callable, and performs retries if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If other types of exceptions is + * caught, then the same exception will be rethrown. If all retries are exhausted, then the last + * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown. + * + * The callable task will be executed at least once. If maxRetries is set to 0, the task will be + * executed exactly once. If maxRetries is set to n, the callable will be executed at + * most n + 1 times. + * + * If retryBackoffMs is set to 0, no wait will happen in between the retries. + * + * @param callable The task to execute. + * @param maxRetries Maximum number of retries. + * @param retryBackoffMs Delay time to retry the callable task upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException}. + * + * @throws ConnectException If the task exhausted all the retries. + */ +public static T retry(Callable callable, long maxRetries, long retryBackoffMs) throws Exception { +Throwable lastError = null; +int attempt = 0; +final long maxAttempts = maxRetries + 1; +while (++attempt <= maxAttempts) { +try { +return callable.call(); +} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { +log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " + +"Reason: {}", attempt, maxRetries - attempt, e.getMessage()); +lastError = e; +} catch (WakeupException e) { +lastError = e; +} catch (Exception e) { +log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage()); +throw e; +} +Utils.sleep(retryBackoffMs); Review comment: Yeah, I think it's worth the bit of logic to fail more quickly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
philipnee commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815158013 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { +private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + +/** + * The method executes the callable, and performs retries if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If other types of exceptions is + * caught, then the same exception will be rethrown. If all retries are exhausted, then the last + * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown. + * + * The callable task will be executed at least once. If maxRetries is set to 0, the task will be + * executed exactly once. If maxRetries is set to n, the callable will be executed at + * most n + 1 times. + * + * If retryBackoffMs is set to 0, no wait will happen in between the retries. + * + * @param callable The task to execute. + * @param maxRetries Maximum number of retries. + * @param retryBackoffMs Delay time to retry the callable task upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException}. + * + * @throws ConnectException If the task exhausted all the retries. + */ +public static T retry(Callable callable, long maxRetries, long retryBackoffMs) throws Exception { +Throwable lastError = null; +int attempt = 0; +final long maxAttempts = maxRetries + 1; +while (++attempt <= maxAttempts) { +try { +return callable.call(); +} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { +log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " + +"Reason: {}", attempt, maxRetries - attempt, e.getMessage()); Review comment: We probably should log the exception, in fact. making the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
philipnee commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815156025 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { +private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + +/** + * The method executes the callable, and performs retries if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If other types of exceptions is + * caught, then the same exception will be rethrown. If all retries are exhausted, then the last + * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown. + * + * The callable task will be executed at least once. If maxRetries is set to 0, the task will be + * executed exactly once. If maxRetries is set to n, the callable will be executed at + * most n + 1 times. + * + * If retryBackoffMs is set to 0, no wait will happen in between the retries. + * + * @param callable The task to execute. + * @param maxRetries Maximum number of retries. + * @param retryBackoffMs Delay time to retry the callable task upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException}. + * + * @throws ConnectException If the task exhausted all the retries. + */ +public static T retry(Callable callable, long maxRetries, long retryBackoffMs) throws Exception { +Throwable lastError = null; +int attempt = 0; +final long maxAttempts = maxRetries + 1; +while (++attempt <= maxAttempts) { +try { +return callable.call(); +} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { +log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " + +"Reason: {}", attempt, maxRetries - attempt, e.getMessage()); +lastError = e; +} catch (WakeupException e) { +lastError = e; +} catch (Exception e) { +log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage()); +throw e; +} +Utils.sleep(retryBackoffMs); Review comment: Agreed, and I actually had the exact same though - though, i thought it would be fine to wait for an additional x-amount of time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
philipnee commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815154741 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { +private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + +/** + * The method executes the callable, and performs retries if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If other types of exceptions is + * caught, then the same exception will be rethrown. If all retries are exhausted, then the last + * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown. + * + * The callable task will be executed at least once. If maxRetries is set to 0, the task will be + * executed exactly once. If maxRetries is set to n, the callable will be executed at + * most n + 1 times. + * + * If retryBackoffMs is set to 0, no wait will happen in between the retries. + * + * @param callable The task to execute. + * @param maxRetries Maximum number of retries. + * @param retryBackoffMs Delay time to retry the callable task upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException}. + * + * @throws ConnectException If the task exhausted all the retries. + */ +public static T retry(Callable callable, long maxRetries, long retryBackoffMs) throws Exception { +Throwable lastError = null; +int attempt = 0; +final long maxAttempts = maxRetries + 1; +while (++attempt <= maxAttempts) { +try { +return callable.call(); +} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { +log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " + +"Reason: {}", attempt, maxRetries - attempt, e.getMessage()); +lastError = e; +} catch (WakeupException e) { +lastError = e; +} catch (Exception e) { +log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage()); +throw e; +} +Utils.sleep(retryBackoffMs); +} + +throw new ConnectException("Fail to retry the task after " + maxRetries + " attempts. Reason: " + lastError, lastError); Review comment: Agreed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
rhauch commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815104923 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -319,9 +319,18 @@ private void poll(long timeoutMs) { } } -private void readToLogEnd() { +/** + * This method finds the end offsets using the listOffsets method of the admin client. + * As the listOffsets method might throw a {@link RetriableException}, the shouldRetry + * flag enables retry, upon catching such exception, if it is set to True. + * + * @param shouldRetry Boolean flag to enable retry for the admin client listOffsets call. Review comment: Maybe add: ```suggestion * @param shouldRetry Boolean flag to enable retry for the admin client listOffsets call. * @see TopicAdmin#retryEndOffsets ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -319,9 +319,18 @@ private void poll(long timeoutMs) { } } -private void readToLogEnd() { +/** + * This method finds the end offsets using the listOffsets method of the admin client. + * As the listOffsets method might throw a {@link RetriableException}, the shouldRetry + * flag enables retry, upon catching such exception, if it is set to True. + * + * @param shouldRetry Boolean flag to enable retry for the admin client listOffsets call. + */ + +private void readToLogEnd(boolean shouldRetry) { Set assignment = consumer.assignment(); -Map endOffsets = readEndOffsets(assignment); +// it will subsequently invoke the listOffsets call here Review comment: I don't think this comment adds much value, especially because "subsequently" is ambiguous. IMO the method call stands on its own. ```suggestion ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { +private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + +/** + * The method executes the callable, and performs retries if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If other types of exceptions is + * caught, then the same exception will be rethrown. If all retries are exhausted, then the last + * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown. + * + * The callable task will be executed at least once. If maxRetries is set to 0, the task will be + * executed exactly once. If maxRetries is set to n, the callable will be executed at + * most n + 1 times. + * + * If retryBackoffMs is set to 0, no wait will happen in between the retries. + * + * @param callable The task to execute. + * @param maxRetries Maximum number of retries. + * @param retryBackoffMs Delay time to retry the callable task upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException}. + * + * @throws ConnectException If the task exhausted all the retries. + */ +public static T retry(Callable callable, long maxRetries, long retryBackoffMs) throws Exception { +Throwable lastError = null; +int attempt = 0; +final long maxAttempts = maxRetries + 1; +while (++attempt <= maxAttempts) { +try { +return callable.call(); +} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { +log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " + +
[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …
C0urante commented on a change in pull request #11572: URL: https://github.com/apache/kafka/pull/11572#discussion_r814970233 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java ## @@ -57,15 +71,46 @@ private final Herder herder; private final List connectorPlugins; -private static final List> CONNECTOR_EXCLUDES = Arrays.asList( +static final List> CONNECTOR_EXCLUDES = Arrays.asList( VerifiableSourceConnector.class, VerifiableSinkConnector.class, MockConnector.class, MockSourceConnector.class, MockSinkConnector.class, SchemaSourceConnector.class ); +@SuppressWarnings("rawtypes") +static final List> TRANSFORM_EXCLUDES = Collections.singletonList( +PredicatedTransformation.class +); + public ConnectorPluginsResource(Herder herder) { this.herder = herder; this.connectorPlugins = new ArrayList<>(); + +// TODO: improve once plugins are allowed to be added/removed during runtime. +for (PluginDesc plugin : herder.plugins().sinkConnectors()) { +if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) { +connectorPlugins.add(new ConnectorPluginInfo(plugin)); +} +} +for (PluginDesc plugin : herder.plugins().sourceConnectors()) { +if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) { +connectorPlugins.add(new ConnectorPluginInfo(plugin)); +} +} +for (PluginDesc> transform : herder.plugins().transformations()) { +if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) { +connectorPlugins.add(new ConnectorPluginInfo(transform)); +} +} +for (PluginDesc> predicate : herder.plugins().predicates()) { +connectorPlugins.add(new ConnectorPluginInfo(predicate)); +} +for (PluginDesc converter : herder.plugins().converters()) { +connectorPlugins.add(new ConnectorPluginInfo(converter)); +} +for (PluginDesc headerConverter : herder.plugins().headerConverters()) { +connectorPlugins.add(new ConnectorPluginInfo(headerConverter)); +} Review comment: Now that we have separate `Plugins::sinkConnectors` and `Plugins::sourceConnectors` methods, we can abstract this a little, which should improve readability a bit and make it easier to extend for other plugin types in the future: ```suggestion static final List> SINK_CONNECTOR_EXCLUDES = Arrays.asList( VerifiableSinkConnector.class, MockSinkConnector.class ); static final List> SOURCE_CONNECTOR_EXCLUDES = Arrays.asList( VerifiableSourceConnector.class, MockSourceConnector.class, SchemaSourceConnector.class ); @SuppressWarnings({"unchecked", "rawtypes"}) static final List>> TRANSFORM_EXCLUDES = Collections.singletonList( (Class) PredicatedTransformation.class ); public ConnectorPluginsResource(Herder herder) { this.herder = herder; this.connectorPlugins = new ArrayList<>(); // TODO: improve once plugins are allowed to be added/removed during runtime. addConnectorPlugins(herder.plugins().sinkConnectors(), SINK_CONNECTOR_EXCLUDES); addConnectorPlugins(herder.plugins().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES); addConnectorPlugins(herder.plugins().transformations(), TRANSFORM_EXCLUDES); addConnectorPlugins(herder.plugins().predicates(), Collections.emptySet()); addConnectorPlugins(herder.plugins().converters(), Collections.emptySet()); addConnectorPlugins(herder.plugins().headerConverters(), Collections.emptySet()); } private void addConnectorPlugins(Collection> plugins, Collection> excludes) { plugins.stream() .filter(p -> !excludes.contains(p.pluginClass())) .map(ConnectorPluginInfo::new) .forEach(connectorPlugins::add); ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ## @@ -750,4 +755,41 @@ private String trace(Throwable t) { return keys; } +@Override +public List connectorPluginConfig(String pluginName) { +List results = new ArrayList<>(); +ConfigDef configDefs; +try { +Plugins p = plugins(); +Object plugin = p.newPlugin(pluginName); +PluginType pluginType = PluginType.from(plugin.getClass()); +switch (pluginType) { +case SINK: +case SOURCE: +configDefs = ((Connector) plugin).config(); +break; +case CONVERTER: +
[GitHub] [kafka] hachikuji opened a new pull request #11807: KAFKA-13698; KRaft authorizer should use host address instead of name
hachikuji opened a new pull request #11807: URL: https://github.com/apache/kafka/pull/11807 Use `InetAddress.getHostAddress` in `StandardAuthorizer` instead of `InetAddress.getHostName`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13698) KRaft authorizer should check host address instead of name
Jason Gustafson created KAFKA-13698: --- Summary: KRaft authorizer should check host address instead of name Key: KAFKA-13698 URL: https://issues.apache.org/jira/browse/KAFKA-13698 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson StandardAuthorizer currently uses `InetAddress.getHostName` to validate hosts specified by ACLs. It should use `InetAddress.getHostAddress` instead as we do in `AclAuthorizer`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
philipnee commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815023731 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -319,9 +319,11 @@ private void poll(long timeoutMs) { } } -private void readToLogEnd() { +private void readToLogEnd(boolean shouldRetry) { Set assignment = consumer.assignment(); -Map endOffsets = readEndOffsets(assignment); +// readEndOffsets makes listOffsets call to adminClient, if shouldRetry is set to True, the adminClinet +// will retry on RetriableExceptions Review comment: Yes! Good call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
rhauch commented on a change in pull request #11797: URL: https://github.com/apache/kafka/pull/11797#discussion_r815017009 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { +private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + +/** + * The method executes the callable, and performs retries if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If other types of exceptions is + * caught, then the same exception will be rethrown. If all retries are exhausted, then the last + * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown. + * + * The callable task will be executed at least once. If maxRetries is set to 0, the task will be + * executed exactly once. If maxRetries is set to n, the callable will be executed at + * most n + 1 times. + * + * If retryBackoffMs is set to 0, no wait will happen in between the retries. + * + * @param callable The task to execute. + * @param maxRetries Maximum number of retries. + * @param retryBackoffMs Delay time to retry the callable task upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException}. + * + * @throws ConnectException If the task exhausted all the retries. + */ +public static T retry(Callable callable, long maxRetries, long retryBackoffMs) throws Exception { +Throwable lastError = null; +int attempt = 0; +long maxAttempts = maxRetries + 1; +while (attempt++ < maxAttempts) { Review comment: Should this be: ```suggestion while (++attempt <= maxAttempts) { ``` so we could use a 1-based attempt in the log message on line 58-59? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { +private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + +/** + * The method executes the callable, and performs retries if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If other types of exceptions is + * caught, then the same exception will be rethrown. If all retries are exhausted, then the last + * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown. + * + * The callable task will be executed at least once. If maxRetries is set to 0, the task will be + *
[GitHub] [kafka] hachikuji commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL
hachikuji commented on a change in pull request #11806: URL: https://github.com/apache/kafka/pull/11806#discussion_r815011402 ## File path: metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java ## @@ -250,14 +253,57 @@ public void testSimpleAuthorizations() throws Exception { withId(newBarAcl(ALTER_CONFIGS, ALLOW))); fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); -assertEquals(Collections.singletonList(ALLOWED), +assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(), -Collections.singletonList(newAction(READ, TOPIC, "foo_"; -assertEquals(Collections.singletonList(ALLOWED), +singletonList(newAction(READ, TOPIC, "foo_"; +assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "fred")).build(), -Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"; +singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"; +} + +@Test +public void testTopicAclWithOperationAll() throws Exception { +StandardAuthorizer authorizer = new StandardAuthorizer(); +authorizer.configure(Collections.emptyMap()); +List acls = Arrays.asList( +new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW), +new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, ALLOW), +new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW) +); + +acls.forEach(acl -> { +StandardAclWithId aclWithId = withId(acl); +authorizer.addAcl(aclWithId.id(), aclWithId.acl()); +}); + +assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), authorizer.authorize( +newRequestContext("alice"), +Arrays.asList( +newAction(WRITE, TOPIC, "foo"), +newAction(DESCRIBE_CONFIGS, TOPIC, "bar"), +newAction(DESCRIBE, TOPIC, "baz"; + +assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize( +newRequestContext("bob"), +Arrays.asList( +newAction(WRITE, TOPIC, "foo"), +newAction(READ, TOPIC, "bar"), +newAction(DESCRIBE, TOPIC, "baz"; Review comment: Sorry, I think I had this as a DENY rule at one point. Will fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL
cmccabe commented on a change in pull request #11806: URL: https://github.com/apache/kafka/pull/11806#discussion_r815011139 ## File path: metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java ## @@ -250,14 +253,57 @@ public void testSimpleAuthorizations() throws Exception { withId(newBarAcl(ALTER_CONFIGS, ALLOW))); fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); -assertEquals(Collections.singletonList(ALLOWED), +assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(), -Collections.singletonList(newAction(READ, TOPIC, "foo_"; -assertEquals(Collections.singletonList(ALLOWED), +singletonList(newAction(READ, TOPIC, "foo_"; +assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "fred")).build(), -Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"; +singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"; +} + +@Test +public void testTopicAclWithOperationAll() throws Exception { +StandardAuthorizer authorizer = new StandardAuthorizer(); +authorizer.configure(Collections.emptyMap()); +List acls = Arrays.asList( Review comment: Can we add a test of DENY logic when ALL is in use? Can probably just add an additional test assert or something -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL
cmccabe commented on a change in pull request #11806: URL: https://github.com/apache/kafka/pull/11806#discussion_r815010107 ## File path: metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java ## @@ -250,14 +253,57 @@ public void testSimpleAuthorizations() throws Exception { withId(newBarAcl(ALTER_CONFIGS, ALLOW))); fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); -assertEquals(Collections.singletonList(ALLOWED), +assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(), -Collections.singletonList(newAction(READ, TOPIC, "foo_"; -assertEquals(Collections.singletonList(ALLOWED), +singletonList(newAction(READ, TOPIC, "foo_"; +assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "fred")).build(), -Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"; +singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"; +} + +@Test +public void testTopicAclWithOperationAll() throws Exception { +StandardAuthorizer authorizer = new StandardAuthorizer(); +authorizer.configure(Collections.emptyMap()); +List acls = Arrays.asList( +new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW), +new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, ALLOW), +new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW) +); + +acls.forEach(acl -> { +StandardAclWithId aclWithId = withId(acl); +authorizer.addAcl(aclWithId.id(), aclWithId.acl()); +}); + +assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), authorizer.authorize( +newRequestContext("alice"), +Arrays.asList( +newAction(WRITE, TOPIC, "foo"), +newAction(DESCRIBE_CONFIGS, TOPIC, "bar"), +newAction(DESCRIBE, TOPIC, "baz"; + +assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize( +newRequestContext("bob"), +Arrays.asList( +newAction(WRITE, TOPIC, "foo"), +newAction(READ, TOPIC, "bar"), +newAction(DESCRIBE, TOPIC, "baz"; Review comment: wait, why does `DESCRIBE TOPIC baz` fail for bob given that we have: ``` new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW) ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL
hachikuji commented on a change in pull request #11806: URL: https://github.com/apache/kafka/pull/11806#discussion_r815008809 ## File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java ## @@ -391,11 +392,13 @@ static AuthorizationResult findResult(Action action, if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null; break; default: -if (!action.operation().equals(acl.operation())) return null; +if (acl.operation() != ALL && action.operation() != acl.operation()) { Review comment: Heh. Yeah, makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL
cmccabe commented on a change in pull request #11806: URL: https://github.com/apache/kafka/pull/11806#discussion_r815008336 ## File path: metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java ## @@ -391,11 +392,13 @@ static AuthorizationResult findResult(Action action, if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null; break; default: -if (!action.operation().equals(acl.operation())) return null; +if (acl.operation() != ALL && action.operation() != acl.operation()) { Review comment: Hmm, would it be easier to just put ``` if (acl.operation() != ALL) { } ``` around the whole block? So ``` if (acl.operation() != ALL) { if (acl.permissionType().equals(ALLOW)) { switch stuff ... } else { if (!action.operation().equals(acl.operation())) return null; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10591: Fix minor bugs in the existing documentation
junrao commented on a change in pull request #10591: URL: https://github.com/apache/kafka/pull/10591#discussion_r815000220 ## File path: docs/ops.html ## @@ -78,7 +78,7 @@ auto.leader.rebalance.enable=true You can also set this to false, but you will then need to manually restore leadership to the restored replicas by running the command: - bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port + bin/kafka-leader-election.sh --bootstrap-server broker_host:port Review comment: Yes, we did move to the kafka-leader-election tool. Could we add the complete command line to do preferred leader election? I think we need to add --election-type preferred. ## File path: docs/security.html ## @@ -2092,12 +2092,12 @@ 7.6.3 Migrating the ZooKeeper ensemble It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information. Please refer to the ZooKeeper documentation for more detail: -https://zookeeper.apache.org/doc/r3.5.7/zookeeperProgrammers.html#sc_ZooKeeperAccessControl;>Apache ZooKeeper documentation +https://zookeeper.apache.org/doc/r3.5.9/zookeeperProgrammers.html#sc_ZooKeeperAccessControl;>Apache ZooKeeper documentation Review comment: The latest ZK version for Kafka is 3.6.3 now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …
mimaison commented on a change in pull request #11572: URL: https://github.com/apache/kafka/pull/11572#discussion_r814995987 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java ## @@ -79,11 +82,21 @@ public int hashCode() { @Override public String toString() { -final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{"); -sb.append("className='").append(className).append('\''); -sb.append(", type=").append(type); -sb.append(", version='").append(version).append('\''); -sb.append('}'); -return sb.toString(); +return "ConnectorPluginInfo{" + "className='" + className + '\'' + +", type=" + type.toString() + +", version='" + version + '\'' + +'}'; +} + +public static final class NoVersionFilter { +@Override +public boolean equals(Object obj) { +return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj); +} + +@Override +public int hashCode() { +return super.hashCode(); +} Review comment: Yes that's a good idea ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java ## @@ -79,11 +82,21 @@ public int hashCode() { @Override public String toString() { -final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{"); -sb.append("className='").append(className).append('\''); -sb.append(", type=").append(type); -sb.append(", version='").append(version).append('\''); -sb.append('}'); -return sb.toString(); +return "ConnectorPluginInfo{" + "className='" + className + '\'' + +", type=" + type.toString() + +", version='" + version + '\'' + +'}'; +} + +public static final class NoVersionFilter { +@Override +public boolean equals(Object obj) { +return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj); +} + +@Override +public int hashCode() { +return super.hashCode(); +} Review comment: Yes that's a good idea, done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL
hachikuji opened a new pull request #11806: URL: https://github.com/apache/kafka/pull/11806 AclOperation.ALL implies all other operation types, but we are not checking for it in StandardAuthorizer. The patch fixes the issue and adds some test cases. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11796: Kip 770
guozhangwang commented on pull request #11796: URL: https://github.com/apache/kafka/pull/11796#issuecomment-1051079006 Hello @vamossagar12 I checked out your branch, and run the `streams:compileTestJava` on my local machine it it also fails with: ``` > Task :streams:compileTestJava /Users/guozhang/Workspace/github/guozhangwang/kafka-work/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java:95: warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), ^ error: warnings found and -Werror specified 1 error 1 warning > Task :streams:compileTestJava FAILED ``` I guess your branch was not rebased on top of `trunk` and maybe that's why you did not see the failure? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13697) KRaft authorizer should handle AclOperation.ALL
Jason Gustafson created KAFKA-13697: --- Summary: KRaft authorizer should handle AclOperation.ALL Key: KAFKA-13697 URL: https://issues.apache.org/jira/browse/KAFKA-13697 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson AclOperation.ALL implies other permissions, but we are not currently checking for it in StandardAuthorizer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on pull request #11733: URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051077042 @hachikuji I pushed a commit so that both controller implementations only persist the leader recovery state if the cluster supports leader recovery state (IBP is greater than 3.2). I am currently working on the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
junrao commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r814981129 ## File path: core/src/main/scala/kafka/log/remote/RemoteLogManager.scala ## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.cluster.Partition +import kafka.metrics.KafkaMetricsGroup +import kafka.server.KafkaConfig +import kafka.server.epoch.LeaderEpochFileCache +import kafka.utils.Logging +import org.apache.kafka.common._ +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} +import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager +import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{Closeable, InputStream} +import java.security.{AccessController, PrivilegedAction} +import java.util +import java.util.Optional +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import scala.collection.Set +import scala.jdk.CollectionConverters._ + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances. + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments. + * + * @param rlmConfig + * @param brokerId + * @param logDir + */ +class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, + brokerId: Int, + logDir: String) extends Logging with Closeable with KafkaMetricsGroup { + + // topic ids received on leadership changes + private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]() + + private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager() + private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager() + + private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir) + + private var closed = false + + private[remote] def createRemoteStorageManager(): RemoteStorageManager = { +def createDelegate(classLoader: ClassLoader): RemoteStorageManager = { + classLoader.loadClass(rlmConfig.remoteStorageManagerClassName()) + .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager] +} + +AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] { + private val classPath = rlmConfig.remoteStorageManagerClassPath() + + override def run(): RemoteStorageManager = { + if (classPath != null && classPath.trim.nonEmpty) { +val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) +val delegate = createDelegate(classLoader) +new ClassLoaderAwareRemoteStorageManager(delegate, classLoader) + } else { +createDelegate(this.getClass.getClassLoader) + } + } +}) + } + + private def configureRSM(): Unit = { +val rsmProps = new util.HashMap[String, Any]() +rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) } +rsmProps.put(KafkaConfig.BrokerIdProp, brokerId) +remoteLogStorageManager.configure(rsmProps) + } + + private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = { +def createDelegate(classLoader: ClassLoader) = { + classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName()) +.getDeclaredConstructor() +.newInstance() +.asInstanceOf[RemoteLogMetadataManager] +} + +AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] { + private val classPath = rlmConfig.remoteLogMetadataManagerClassPath + + override def run(): RemoteLogMetadataManager = { +if (classPath != null && classPath.trim.nonEmpty) { + val
[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on pull request #11733: URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051066928 > For this [comment](https://github.com/apache/kafka/pull/11733#discussion_r812044247): > > > If the version is `0` then this is guaranteed to be the default value `0` so the serialization will succeed. This is true because we only write these values in the response when the operation success. If the operation fails then we skip writing these values and instead just write the error code. > > This is a bit subjective, so feel free to disregard, but I do feel like some of the implicit assumptions might be causing some unnecessary obscurity. This is one case where a version check might actually be clearer and prevent the need for the extra comment. > > A second case is implicitly setting RECOVERED in `PendingPartitionChange`. I had a comment about this [here](https://github.com/apache/kafka/pull/11733/files#r805005385), which might have been missed. This is fine at the moment because the current patch does not do any actual recovery operation, but I think we should reconsider it when we do. Otherwise I do think it's easy to overlook the implication when making other partition state changes. Okay. As you suggested, I marked the LeaderRecoveryState field for the AlterParititon response as ignorable. This has the same behavior we want but implemented in the serialization layer instead of in the controller logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on pull request #11733: URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051064227 > Hmm, suppose we have a partition with replicas=(0) and isr=(0), but 0 is offline. The user chooses to reassign to node 1 and perform an unclean election to make it the leader. That would allow the reassignment to complete, but the partition state would be RECOVERING, right? How do we get out of that state? I think that is correct. I think what we can do here is make sure that the leader sends an AlterPartition request when it changes from RECOVERING to RECOVERED. I didn't implement it in this PR since this implementation is a noop for the recovering state. Do you mind if I implement this in a future PR? I filed: https://issues.apache.org/jira/browse/KAFKA-13696 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13696) Topic partition leader should always send AlterPartition when transitioning from RECOVRING TO RECOVERD
Jose Armando Garcia Sancio created KAFKA-13696: -- Summary: Topic partition leader should always send AlterPartition when transitioning from RECOVRING TO RECOVERD Key: KAFKA-13696 URL: https://issues.apache.org/jira/browse/KAFKA-13696 Project: Kafka Issue Type: Task Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13681) Sink event duplicates for partition-stuck stream application
[ https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498236#comment-17498236 ] Guozhang Wang commented on KAFKA-13681: --- [~DrozD_0] We have not decided when it would be applied as a general optimization yet, but since we usually only do backports for any non-critical changes for the past year's releases, it's less likely we are going to have new 2.x bug-fix releases. > Sink event duplicates for partition-stuck stream application > > > Key: KAFKA-13681 > URL: https://issues.apache.org/jira/browse/KAFKA-13681 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.1 >Reporter: Mikhail Dubrovin >Priority: Major > Attachments: fail_topology.txt > > > Hello, > We found the following unpredictable behavior of Kafka streams: > {code:java} > public void buildStreams(final BuilderHelper builder) { > KTable table = builder.table(); > KTable> workflowTable = > workflowTable(builder); > table > .mapValues(value -> mappers.mainDTO(value)) > .leftJoin(workflowTable, mappers::joinWorkflows) > .toStream() > .map((key, value) -> KeyValue.pair( > AggregateId.newBuilder().setId(value.getId()).build(), > mappers.aggregateDTO(value))) > .peek((k, v) -> logSinkRecord(v)) > .filter((id, dto) -> !isReprocessing) > .to(...); > } > private static KTable> > workflowTable(BuilderHelper builderHelper) { > return builderHelper.workflowTable() > .groupBy((id, workflow) -> KeyValue.pair( > > TableId.newBuilder().setId(workflow.getTableId()).build(), > mappers.mapWorkflow(workflow)), > Grouped.with(...)) > .aggregate(ArrayList::new, (key, value, agg) -> { > agg.add(value); > return agg; > }, (key, value, agg) -> { > agg.remove(value); > return agg; > }, Materialized.with(...)); > } {code} > it is a small part of our topology but it shows the error flow. > *Data structure:* > We have two many-partition topics: entity and workflow. Every topic is > represented as KTable. > *Data error that causes application shutdown:* > Our final event(join the entity and workflow ktables) expects a not-null > field in the entity but for some reason, it comes for one event. The whole > aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ > method > We have a health check which restarts the aggregator if it fails. > When incorrect data comes to one partition, the partition processing is stuck > but other partitions are processed. > It causes that at every restart, _workflowTable_ topology repeats > .aggregate() add/remove flows and puts new List into the repartition topic. > But offsets are not moved for processed partitions due to the aggregator's > shutdown. > _This behavior generates/sinks a lot of final entity duplicates at every > restart because the flow is successful for data from a not-corrupted > partition but offsets are not moved for them._ > And it also causes troubles if @EqualsAndHashCode is defined to use all > fields to compare. At every restart, the topology tries to remove the old > value(not existing after the first run) and adds a new value at the end of > the list. The list grows after each restart(contains the same - new value > values). > I also attached the topology description. To visualize: > [https://zz85.github.io/kafka-streams-viz/] > *Current workaround:* > To redefine @EqualsAndHashCode to use entities' ids only. > *Not solved issue:* > Sink events duplication at every restart. > Thank you in advance! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13694) Some InvalidRecordException messages are thrown away
[ https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498234#comment-17498234 ] Guozhang Wang commented on KAFKA-13694: --- Hello [~RivenSun] thanks for the report. If I understand you correctly, we are trying to solve two issues here: 1) Let the broker to print the meaningful error message when validation fails. 2) Let the broker to send the error message all the way back to client, so that client can also print the meaningful message. For 1) I think it's definitely a win, and for that purpose I think we can consider just replace the `processFailedRecord(topicPartition, rve.invalidException)` call with `processFailedRecord(topicPartition, rve)`, which then should include both the invalidException as well as the recordErrors in the error message. For 2), what I saw is that today we indeed put the `recordErrors` as part of the `PartitionResponse` and hence encoded back to the client, hence would be triggered as part of the `producer.Callback`. So if user does implement a Callback it should be observed; or did I missed something? > Some InvalidRecordException messages are thrown away > > > Key: KAFKA-13694 > URL: https://issues.apache.org/jira/browse/KAFKA-13694 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 3.0.0 >Reporter: RivenSun >Priority: Major > > 1.Example > Topic level config:"cleanup.policy":"compact" > But when the producer sends the message, the ProducerRecord does not specify > the key. > > producer.log > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One > or more records have been rejected {code} > > > server.log > {code:java} > [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing > append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: One or more records have been > rejected {code} > Through the logs of the producer and server, we do not know the reason for > the failure of sending, only that the message was rejected by the server. > You can compare the RecordTooLargeException testCase, we can clearly know the > reason for the failure from the producer, and the server will not print the > log (the reason will be explained later) > producer_message_too_large.log : > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The > request included a message larger than the max message size the server will > accept. > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The > request included a message larger than the max message size the server will > accept. {code} > 2.RootCause > ReplicaManager#appendToLocalLog(...) -> > Partition#appendRecordsToLeader(...) -> > UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) -> > LogValidator#validateMessagesAndAssignOffsets(...) > 1) Analyze the validateMessagesAndAssignOffsets method, > In the LogValidator#validateRecord method, validateKey and validateTimestamp > are called, and the error information of all messages is obtained: > Seq[ApiRecordError]; > In the subsequent processRecordErrors(recordErrors) method, currently only > special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR > returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the > code will run to > {code:java} > else { > throw new RecordValidationException(new InvalidRecordException( > "One or more records have been rejected"), errors) > }{code} > In fact, the *errors* variable here contains the specific information of each > recordError, but we did not put the errors information into the message of > InvalidRecordException. > 2).The exception thrown by processRecordErrors will be caught by > ReplicaManager#appendToLocalLog(...), we continue to analyze the > `catchException code` of appendToLocalLog. > Here, we can know the RecordTooLargeException, why the server does not print > the log. > Under case rve: RecordValidationException, > The server prints the log: processFailedRecord method, > and sends a response to the client: LogAppendResult method > In these two methods, we can find that we only use rve.invalidException, > For rve.recordErrors, the server neither prints it nor returns it to the > client. > 3.Solution > Two solutions, I prefer the second > 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns > Errors.INVALID_RECORD_WITHOUT_KEY, > In the processRecordErrors method, also do special processing for > Errors.INVALID_RECORD_WITHOUT_KEY >
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814964024 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2781,8 +2812,9 @@ case object IsrChangeNotification extends ControllerEvent { override def preempt(): Unit = {} } -case class AlterIsrReceived(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], -callback: AlterIsrCallback) extends ControllerEvent { +case class AlterPartitionReceived( + brokerId: Int, brokerEpoch: Long, partitionssToAlter: Map[TopicPartition, LeaderAndIsr], callback: AlterPartitionCallback Review comment: Fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814963257 ## File path: core/src/main/scala/kafka/controller/Election.scala ## @@ -53,17 +60,17 @@ object Election { * Elect leaders for new or offline partitions. * * @param controllerContext Context with the current state of the cluster - * @param partitionsWithUncleanLeaderElectionState A sequence of tuples representing the partitions + * @param partitionsWithUncleanLeaderLeaderRecoveryState A sequence of tuples representing the partitions Review comment: Yes. Fixed. I think I did a search and replace at some point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704
jsancio commented on a change in pull request #11733: URL: https://github.com/apache/kafka/pull/11733#discussion_r814961954 ## File path: core/src/main/scala/kafka/controller/Election.scala ## @@ -40,7 +40,14 @@ object Election { val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) - leaderAndIsr.newLeaderAndIsr(leader, newIsr) + + if (!isr.contains(leader)) { +// The new leader is not in the old ISR so mark the partition a RECOVERING +leaderAndIsr.newRecoveringLeaderAndIsr(leader, newIsr) + } else { +// Elect a new leader but keep the previous leader recovery state Review comment: Yes. The case that I had in mind is: 1. Leader is elected using unclean leader election. E.g. leader: 1, recoveryState: RECOVERING 2. Leader never sends AlterPartition and goes offline. E.g. leader: -1, recoveryState: RECOVERING 3. Only ISR member (id 1) comes back online. E.g. leader:1, recoveryState: RECOVERING -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …
C0urante commented on a change in pull request #11572: URL: https://github.com/apache/kafka/pull/11572#discussion_r814955840 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java ## @@ -79,11 +82,21 @@ public int hashCode() { @Override public String toString() { -final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{"); -sb.append("className='").append(className).append('\''); -sb.append(", type=").append(type); -sb.append(", version='").append(version).append('\''); -sb.append('}'); -return sb.toString(); +return "ConnectorPluginInfo{" + "className='" + className + '\'' + +", type=" + type.toString() + +", version='" + version + '\'' + +'}'; +} + +public static final class NoVersionFilter { +@Override +public boolean equals(Object obj) { +return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj); +} + +@Override +public int hashCode() { +return super.hashCode(); +} Review comment: Ah yeah, compiler warning about overriding `equals` but not `hashCode`. Think we should leave a comment about that just so others don't wonder the same thing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11705: KAFKA-9847: add config to set default store type
guozhangwang commented on pull request #11705: URL: https://github.com/apache/kafka/pull/11705#issuecomment-1051037462 I made another pass and it LGTM. @ableegoldman would you like to make another pass? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11805: KAFKA-13692: include metadata wait time in total blocked time
guozhangwang commented on a change in pull request #11805: URL: https://github.com/apache/kafka/pull/11805#discussion_r814937598 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ## @@ -223,7 +224,9 @@ private double totalBlockedTime(final Producer producer) { + getMetricValue(producer.metrics(), "txn-begin-time-ns-total") + getMetricValue(producer.metrics(), "txn-send-offsets-time-ns-total") + getMetricValue(producer.metrics(), "txn-commit-time-ns-total") -+ getMetricValue(producer.metrics(), "txn-abort-time-ns-total"); ++ getMetricValue(producer.metrics(), "txn-abort-time-ns-total") Review comment: This is a meta question: since ns and ms measurement mechanisms differ, if we simply sum them as `ns + ms * 10^6` it may not be accurate. Probably worth checking in if the aggregated total blocked time does make sense. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java ## @@ -71,6 +74,10 @@ public KafkaProducerMetrics(Metrics metrics) { TXN_ABORT, "Total time producer has spent in abortTransaction in nanoseconds." ); +metadataWaitSensor = newLatencySensor( +METADATA_WAIT, +"Total time producer has spent waiting on metadata in " Review comment: This line seems not completed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …
mimaison commented on pull request #11572: URL: https://github.com/apache/kafka/pull/11572#issuecomment-1051008938 Thanks @C0urante for the review, all good suggestions. I believe I've addressed them all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …
mimaison commented on a change in pull request #11572: URL: https://github.com/apache/kafka/pull/11572#discussion_r814917568 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java ## @@ -79,11 +82,21 @@ public int hashCode() { @Override public String toString() { -final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{"); -sb.append("className='").append(className).append('\''); -sb.append(", type=").append(type); -sb.append(", version='").append(version).append('\''); -sb.append('}'); -return sb.toString(); +return "ConnectorPluginInfo{" + "className='" + className + '\'' + +", type=" + type.toString() + +", version='" + version + '\'' + +'}'; +} + +public static final class NoVersionFilter { +@Override +public boolean equals(Object obj) { +return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj); +} + +@Override +public int hashCode() { +return super.hashCode(); +} Review comment: Without this method, the compilation fails. To be honest, I'm not sure if there's a way to avoid it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #11777: KAFKA-10000: Add producer fencing API to admin client (KIP-618)
C0urante commented on a change in pull request #11777: URL: https://github.com/apache/kafka/pull/11777#discussion_r814866192 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.message.InitProducerIdRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.InitProducerIdRequest; +import org.apache.kafka.common.requests.InitProducerIdResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.ProducerIdAndEpoch; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class FenceProducersHandler extends AdminApiHandler.Unbatched { +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public FenceProducersHandler( +LogContext logContext +) { +this.log = logContext.logger(DescribeTransactionsHandler.class); +this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext); +} + +public static AdminApiFuture.SimpleAdminApiFuture newFuture( +Collection transactionalIds +) { +return AdminApiFuture.forKeys(buildKeySet(transactionalIds)); +} + +private static Set buildKeySet(Collection transactionalIds) { +return transactionalIds.stream() +.map(CoordinatorKey::byTransactionalId) +.collect(Collectors.toSet()); +} + +@Override +public String apiName() { +return "fenceProducer"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return lookupStrategy; +} + +@Override +InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey key) { +if (key.type != FindCoordinatorRequest.CoordinatorType.TRANSACTION) { +throw new IllegalArgumentException("Invalid group coordinator key " + key + +" when building `InitProducerId` request"); +} +InitProducerIdRequestData data = new InitProducerIdRequestData() +.setProducerEpoch(ProducerIdAndEpoch.NONE.epoch) +.setProducerId(ProducerIdAndEpoch.NONE.producerId) +.setTransactionalId(key.idValue) +// Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID, +// and shouldn't be used for any actual record writes +.setTransactionTimeoutMs(1); +return new InitProducerIdRequest.Builder(data); +} + +@Override +public ApiResult handleSingleResponse( +Node broker, +CoordinatorKey key, +AbstractResponse abstractResponse +) { +InitProducerIdResponse response = (InitProducerIdResponse) abstractResponse; + +Errors error = Errors.forCode(response.data().errorCode()); +if (error != Errors.NONE) { +return handleError(key, error); +} + +Map completed = Collections.singletonMap(key, new ProducerIdAndEpoch( +response.data().producerId(), +response.data().producerEpoch() +)); + +return new ApiResult<>(completed, Collections.emptyMap(), Collections.emptyList()); +} + +private ApiResult handleError(CoordinatorKey transactionalIdKey, Errors error) { +switch (error) { Review comment: Ah, good point about extensibility. I'd personally opt to just leave a comment (and in
[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others
viktorsomogyi commented on pull request #10738: URL: https://github.com/apache/kafka/pull/10738#issuecomment-1050920500 Thank you Manikumar! I'll rebase the change then, will ping you when I'm done (since it is late Friday here it might be on Monday). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
viktorsomogyi commented on pull request #11748: URL: https://github.com/apache/kafka/pull/11748#issuecomment-1050913057 @mimaison, just saw your comment on KAFKA-13659. I can take a look at this next week. @urbandan and @dorwi already has a fix for KAFKA-13659 (and thus this) in our distro that we wanted to contribute back. If they have time they may also review to your solution or share their approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13695) Low-traffic topics don't roll (and therefore compact) nor delete tombstones
[ https://issues.apache.org/jira/browse/KAFKA-13695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marc updated KAFKA-13695: - Attachment: DeletionConceptTestbed > Low-traffic topics don't roll (and therefore compact) nor delete tombstones > --- > > Key: KAFKA-13695 > URL: https://issues.apache.org/jira/browse/KAFKA-13695 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.5.0, 3.1.0 >Reporter: Marc >Priority: Major > > I set up a testbed with some partitions and inspected carefully the behaviour > on the Kafka /data folder. > It looks like when active segment qualifies for rolling it is not effectively > being closed until a new record arrives. Thus, it cannot be compacted in a > timely and deterministic manner by means of max.compaction.lag.ms, for > instance. > Again the problem occurs when setting up delete.retention.ms. Once compaction > happened and the canonical latest state of a key is a unique tombstone on the > compacted tail, we must wait for an arbitrary record arrival in order for > deletion to be triggered, just as before. > I expected log.preallocate property to create a new segment file once we > marked the current one for rolling and we still got no new records, hoping it > can be rolled at last, but it seems it has nothing to do. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13695) Low-traffic topics don't roll (and therefore compact) nor delete tombstones
[ https://issues.apache.org/jira/browse/KAFKA-13695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marc updated KAFKA-13695: - Attachment: (was: DeletionConceptTestbed) > Low-traffic topics don't roll (and therefore compact) nor delete tombstones > --- > > Key: KAFKA-13695 > URL: https://issues.apache.org/jira/browse/KAFKA-13695 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.5.0, 3.1.0 >Reporter: Marc >Priority: Major > > I set up a testbed with some partitions and inspected carefully the behaviour > on the Kafka /data folder. > It looks like when active segment qualifies for rolling it is not effectively > being closed until a new record arrives. Thus, it cannot be compacted in a > timely and deterministic manner by means of max.compaction.lag.ms, for > instance. > Again the problem occurs when setting up delete.retention.ms. Once compaction > happened and the canonical latest state of a key is a unique tombstone on the > compacted tail, we must wait for an arbitrary record arrival in order for > deletion to be triggered, just as before. > I expected log.preallocate property to create a new segment file once we > marked the current one for rolling and we still got no new records, hoping it > can be rolled at last, but it seems it has nothing to do. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] omkreddy commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others
omkreddy commented on pull request #10738: URL: https://github.com/apache/kafka/pull/10738#issuecomment-1050897685 Sorry for missing this PR. I will help to review and merge the PR . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others
viktorsomogyi commented on pull request #10738: URL: https://github.com/apache/kafka/pull/10738#issuecomment-1050877629 @sachmo99 I pinged @rajinisivaram and @omkreddy in email, hopefully they respond and I'd be happy to rebase this change on top of trunk and push it in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap
viktorsomogyi edited a comment on pull request #9519: URL: https://github.com/apache/kafka/pull/9519#issuecomment-1050869998 @lbradstreet @ijuma @junrao I'd like to revive this. This is a bit old I know, but I'd like to close out dangling open issues. Is this needed in the Apache Kafka distribution? (We've been using Murmur3 for the Cloudera distro for 2 years now, didn't have any problems with it since then so it's somewhat time tested.) If you think this is needed, then please reply within a week and I resolve the conflicts, otherwise I close this PR to keep my own list clear. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap
viktorsomogyi commented on pull request #9519: URL: https://github.com/apache/kafka/pull/9519#issuecomment-1050869998 @lbradstreet @ijuma @junrao I'd like to revive this. This is a bit old I know, but I'd like to close out dangling open issues. Is this needed in the Apache Kafka distribution? (We've been using Murmur3 for the Cloudera distro for 2 years now, didn't have any problems with it since then so it's somewhat time tested.) If you think this is needed, then please reply within a week, otherwise I close this PR to keep my own list clear. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13678) 2nd punctuation using STREAM_TIME does not respect scheduled interval
[ https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498061#comment-17498061 ] Matteo edited comment on KAFKA-13678 at 2/25/22, 1:40 PM: -- hi guys, I'm Matteo and I'm working in the same team of Lorenzo. I can't get your point about determinism. If the punctuator set the next ring-time with respect to the current event-timestamp, the determinism would perfectly hold even in case of reprocessing. Let's examine a practical use case: I want to monitor 2 sensors. The sensors are designed to send an "ok" signal and, sometimes, an "error" signal, when things are wrong. Let's imagine we want to be notified when a sensor sends an "error" signal and no "ok" signals in the next 5 minutes. Something like "if you're in an error state and the situation doesn't change for 5 minutes, then I want to take a particular action". On the other hand, if we receive an error signal but an "ok" signal arrives in the next 5 minutes, then we don't care about the error. Now, let's imagine this situation: we receive an "error" signal from sensor 1, at event time t0. After a while we receive an "error signal" at event time t1, this time from sensor 2. We expect the behavior to be woken up at time t0+5min and at time t1+5min to take the appropriate actions (as soon as we do not receive any "ok" signal in the meanwhile). First, let's clarify that the time reference has necessarily to be the event time: indeed, if we receive an "error" event and the ingestion stops for 10 minutes (for example because of a network problem) we don't want to trigger any action as, actually, the sensor could have sent an "ok" signal in the meanwhile but we weren't able to consume it yet. With the current punctuator semantic, this use case is impossible to implement. On the other hand, making a punctuator to set the "wake up" trigger with respect to the current event timestamp would do the work. Two notes here: 1) the "current" timestamp event is of course a "best effort" approach as the granularity (and, so, the precision) of the time measurement depends on the granularity of the incoming events. 2) the semantic of a component like that would remain the same no matter if you are in "real time" or "reprocess" situation, preserving the determinism (please, give me a counter-example that could explain why the determinism wouldn't hold). was (Author: JIRAUSER285799): hi guys, I'm Matteo and I'm working in the same team of Lorenzo. I can't get your point about determinism. If the punctuator set the next ring-time with respect to the current event-timestamp, the determinism would perfectly hold even in case of reprocessing. Let's examine a practical use case: I want to monitor 2 sensors. The sensors are designed to send an "ok" signal and, sometimes, an "error" signal, when things are wrong. Let's imagine we want to be notified when a sensor sends an "error" signal and no "ok" signals in the next 5 minutes. Something like "if you're in an error state and the situation doesn't change for 5 minutes, then I want to take a particular action". On the other hand, if we receive an error signal but an "ok" signal arrives in the next 5 minutes, then we don't care about the error. Now, let's imagine this situation: we receive an "error" signal from sensor 1, at event time t0. After a while we receive an "error signal" at event time t1, this time from sensor 2. We expect the behavior to be waked up at time t0+5min and at time t1+5min to take the appropriate actions (as soon as we do not receive any "ok" signal in the meanwhile). First, let's clarify that the time reference has necessarily to be the event time: indeed, if we receive an "error" event and the ingestion stops for 10 minutes (for example because of a network problem) we don't want to trigger any action as, actually, the sensor could have sent an "ok" signal in the meanwhile but we weren't able to consume it yet. With the current punctuator semantic, this use case is impossible to implement. On the other hand, making a punctuator to set the "wake up" trigger with respect to the current event timestamp would do the work. Two notes here: 1) the "current" timestamp event is of course a "best effort" approach as the granularity (and, so, the precision) of the time measurement depends on the granularity of the incoming events. 2) the semantic of a component like that would remain the same no matter if you are in "real time" or "reprocess" situation, preserving the determinism (please, give me a counter-example that could explain why the determinism wouldn't hold). > 2nd punctuation using STREAM_TIME does not respect scheduled interval > - > > Key: KAFKA-13678 > URL: https://issues.apache.org/jira/browse/KAFKA-13678 >
[GitHub] [kafka] viktorsomogyi closed pull request #11491: KAFKA-13442: REST API endpoint for fetching a connector's config def
viktorsomogyi closed pull request #11491: URL: https://github.com/apache/kafka/pull/11491 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11788: KAFKA-13673: disable idempotence when config conflicts
ijuma commented on a change in pull request #11788: URL: https://github.com/apache/kafka/pull/11788#discussion_r814763487 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -461,27 +467,53 @@ private void postProcessAndValidateIdempotenceConfigs(final Map final Map originalConfigs = this.originals(); final String acksStr = parseAcks(this.getString(ACKS_CONFIG)); configs.put(ACKS_CONFIG, acksStr); - -// For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation -if (idempotenceEnabled()) { -boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG); -if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) { -throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer."); +final boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); +boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); +boolean shouldDisableIdempotence = false; + +// For idempotence producers, values for `retries` and `acks` and `max.in.flight.requests.per.connection` need validation +if (idempotenceEnabled) { +final int retries = this.getInt(RETRIES_CONFIG); +if (retries == 0) { +if (userConfiguredIdempotence) { +throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer."); +} +log.info("Idempotence will be disabled because {} is set to 0.", RETRIES_CONFIG, retries); +shouldDisableIdempotence = true; } -boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG); final short acks = Short.valueOf(acksStr); -if (userConfiguredAcks && acks != (short) -1) { -throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + +if (acks != (short) -1) { +if (userConfiguredIdempotence) { +throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + "producer. Otherwise we cannot guarantee idempotence."); +} +log.info("Idempotence will be disabled because {} is set to {}, not set to 'all'.", ACKS_CONFIG, acks); +shouldDisableIdempotence = true; } -boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); -if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { -throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + +final int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); +if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) { +if (userConfiguredIdempotence) { +throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + " to use the idempotent producer."); +} +log.warn("Idempotence will be disabled because {} is set to {}, which is greater than 5. " + +"Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection); +shouldDisableIdempotence = true; } } + +if (shouldDisableIdempotence) { +configs.put(ENABLE_IDEMPOTENCE_CONFIG, false); +} + +// validate `transaction.id` after validating idempotence dependant configs because `enable.idempotence` config might be overridden +idempotenceEnabled = idempotenceEnabled && !shouldDisableIdempotence; Review comment: Maybe we can set this to `false` in the `shouldDisableIdempotence` block? Seems a bit more natural. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13695) Low-traffic topics don't roll (and therefore compact) nor delete tombstones
Marc created KAFKA-13695: Summary: Low-traffic topics don't roll (and therefore compact) nor delete tombstones Key: KAFKA-13695 URL: https://issues.apache.org/jira/browse/KAFKA-13695 Project: Kafka Issue Type: Improvement Affects Versions: 3.1.0, 2.5.0 Reporter: Marc I set up a testbed with some partitions and inspected carefully the behaviour on the Kafka /data folder. It looks like when active segment qualifies for rolling it is not effectively being closed until a new record arrives. Thus, it cannot be compacted in a timely and deterministic manner by means of max.compaction.lag.ms, for instance. Again the problem occurs when setting up delete.retention.ms. Once compaction happened and the canonical latest state of a key is a unique tombstone on the compacted tail, we must wait for an arbitrary record arrival in order for deletion to be triggered, just as before. I expected log.preallocate property to create a new segment file once we marked the current one for rolling and we still got no new records, hoping it can be rolled at last, but it seems it has nothing to do. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on a change in pull request #11705: KAFKA-9847: add config to set default store type
showuon commented on a change in pull request #11705: URL: https://github.com/apache/kafka/pull/11705#discussion_r814753767 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -81,14 +81,19 @@ public StreamsBuilder() { internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); } -protected StreamsBuilder(final TopologyConfig topologyConfigs) { +/** + * Create a {@code StreamsBuilder} instance. + * + * @param topologyConfigsthe streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail + */ +public StreamsBuilder(final TopologyConfig topologyConfigs) { topology = getNewTopology(topologyConfigs); internalTopologyBuilder = topology.internalTopologyBuilder; internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); } protected Topology getNewTopology(final TopologyConfig topologyConfigs) { Review comment: Yes, agree with you. This should be part of KIP-591 to make the `TopologyConfig` as public API. Moved out of `internals.namedtopology` package now. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13693) Config in memory got inconsistency value
[ https://issues.apache.org/jira/browse/KAFKA-13693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498090#comment-17498090 ] Augusto Hack commented on KAFKA-13693: -- [~showuon] here are some additional details: * Kafka version: 2.8.1 * partitions: 3 * policy: compact * the znode for the topic was never modified (version was 0, and ctime and mtime equal) * brokers existed before the topic was created (checked via timestamp of the znodes in `/brokers/ids`) Observed bug: * The LogConfig object of one of the partitions in the leader had the flag delete set to true, every other LogConfig had the correct value (true for compact, false for delete). Checked with a memory dump and querying using Memory Analyzer (MAT) > Config in memory got inconsistency value > > > Key: KAFKA-13693 > URL: https://issues.apache.org/jira/browse/KAFKA-13693 > Project: Kafka > Issue Type: Bug >Reporter: Aiqin Zhang >Priority: Minor > Attachments: Screenshot from 2022-02-25 14-34-36.png > > > We have a kafka cluster with a topic which configured as clean up policy = > compact. > But found after timeout timer, the data got deleted on one of the partitions. > > Further investigation shows that in memory the value is actually delete which > is inconsistent. > > Topic was created via Terraform in bulk, around 250 together and returned > with success. > > We did the broker restart to correct the setting in memory. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13678) 2nd punctuation using STREAM_TIME does not respect scheduled interval
[ https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498061#comment-17498061 ] Matteo edited comment on KAFKA-13678 at 2/25/22, 11:07 AM: --- hi guys, I'm Matteo and I'm working in the same team of Lorenzo. I can't get your point about determinism. If the punctuator set the next ring-time with respect to the current event-timestamp, the determinism would perfectly hold even in case of reprocessing. Let's examine a practical use case: I want to monitor 2 sensors. The sensors are designed to send an "ok" signal and, sometimes, an "error" signal, when things are wrong. Let's imagine we want to be notified when a sensor sends an "error" signal and no "ok" signals in the next 5 minutes. Something like "if you're in an error state and the situation doesn't change for 5 minutes, then I want to take a particular action". On the other hand, if we receive an error signal but an "ok" signal arrives in the next 5 minutes, then we don't care about the error. Now, let's imagine this situation: we receive an "error" signal from sensor 1, at event time t0. After a while we receive an "error signal" at event time t1, this time from sensor 2. We expect the behavior to be waked up at time t0+5min and at time t1+5min to take the appropriate actions (as soon as we do not receive any "ok" signal in the meanwhile). First, let's clarify that the time reference has necessarily to be the event time: indeed, if we receive an "error" event and the ingestion stops for 10 minutes (for example because of a network problem) we don't want to trigger any action as, actually, the sensor could have sent an "ok" signal in the meanwhile but we weren't able to consume it yet. With the current punctuator semantic, this use case is impossible to implement. On the other hand, making a punctuator to set the "wake up" trigger with respect to the current event timestamp would do the work. Two notes here: 1) the "current" timestamp event is of course a "best effort" approach as the granularity (and, so, the precision) of the time measurement depends on the granularity of the incoming events. 2) the semantic of a component like that would remain the same no matter if you are in "real time" or "reprocess" situation, preserving the determinism (please, give me a counter-example that could explain why the determinism wouldn't hold). was (Author: JIRAUSER285799): I can't get your point about determinism. If the punctuator set the next ring-time with respect to the current event-timestamp, the determinism would perfectly hold even in case of reprocessing. Let's examine a practical use case: I want to monitor 2 sensors. The sensors are designed to send an "ok" signal and, sometimes, an "error" signal, when things are wrong. Let's imagine we want to be notified when a sensor sends an "error" signal and no "ok" signals in the next 5 minutes. Something like "if you're in an error state and the situation doesn't change for 5 minutes, then I want to take a particular action". On the other hand, if we receive an error signal but an "ok" signal arrives in the next 5 minutes, then we don't care about the error. Now, let's imagine this situation: we receive an "error" signal from sensor 1, at event time t0. After a while we receive an "error signal" at event time t1, this time from sensor 2. We expect the behavior to be waked up at time t0+5min and at time t1+5min to take the appropriate actions (as soon as we do not receive any "ok" signal in the meanwhile). First, let's clarify that the time reference has necessarily to be the event time: indeed, if we receive an "error" event and the ingestion stops for 10 minutes (for example because of a network problem) we don't want to trigger any action as, actually, the sensor could have sent an "ok" signal in the meanwhile but we weren't able to consume it yet. With the current punctuator semantic, this use case is impossible to implement. On the other hand, making a punctuator to set the "wake up" trigger with respect to the current event timestamp would do the work. Two notes here: 1) the "current" timestamp event is of course a "best effort" approach as the granularity (and, so, the precision) of the time measurement depends on the granularity of the incoming events. 2) the semantic of a component like that would remain the same no matter if you are in "real time" or "reprocess" situation, preserving the determinism (please, give me a counter-example that could explain why the determinism wouldn't hold). > 2nd punctuation using STREAM_TIME does not respect scheduled interval > - > > Key: KAFKA-13678 > URL: https://issues.apache.org/jira/browse/KAFKA-13678 > Project: Kafka > Issue Type: Improvement >
[GitHub] [kafka] kowshik commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
kowshik commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r814678429 ## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ## @@ -386,11 +397,143 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + override protected def buildRemoteLogAuxState(partition: TopicPartition, Review comment: This method is doing a lot of things, and it is worthwhile thinking about how to simplify it. In its current form, it is going to be hard to test it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
kowshik commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r814640948 ## File path: core/src/main/scala/kafka/log/BaseIndex.scala ## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.{Closeable, File} +import java.nio.file.Path + +import org.apache.kafka.common.utils.Utils + +/** + * This class represents a common abstraction for operations like delete and rename of the index files. Review comment: > This class represents a common abstraction for operations like delete and rename of the index files. This class is slim in functionality, and I don't feel there is any real benefit for introducing this. Also for the future, it is not clear to me what operations can be included in this class, and which ones can't be. I feel that the earlier design without this base class was simpler. Are we planning to add new functionality in the future into this class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
kowshik commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r814640948 ## File path: core/src/main/scala/kafka/log/BaseIndex.scala ## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.{Closeable, File} +import java.nio.file.Path + +import org.apache.kafka.common.utils.Utils + +/** + * This class represents a common abstraction for operations like delete and rename of the index files. Review comment: > This class represents a common abstraction for operations like delete and rename of the index files. This class is slim in functionality, and I don't feel there is any real benefit for introducing this. Also for the future, it is not clear to me what operations can be included in this class, and which ones can't be. I feel that the earlier design without this base class was simpler. Were you planning to add new functionality in the future into this class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13678) 2nd punctuation using STREAM_TIME does not respect scheduled interval
[ https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498061#comment-17498061 ] Matteo commented on KAFKA-13678: I can't get your point about determinism. If the punctuator set the next ring-time with respect to the current event-timestamp, the determinism would perfectly hold even in case of reprocessing. Let's examine a practical use case: I want to monitor 2 sensors. The sensors are designed to send an "ok" signal and, sometimes, an "error" signal, when things are wrong. Let's imagine we want to be notified when a sensor sends an "error" signal and no "ok" signals in the next 5 minutes. Something like "if you're in an error state and the situation doesn't change for 5 minutes, then I want to take a particular action". On the other hand, if we receive an error signal but an "ok" signal arrives in the next 5 minutes, then we don't care about the error. Now, let's imagine this situation: we receive an "error" signal from sensor 1, at event time t0. After a while we receive an "error signal" at event time t1, this time from sensor 2. We expect the behavior to be waked up at time t0+5min and at time t1+5min to take the appropriate actions (as soon as we do not receive any "ok" signal in the meanwhile). First, let's clarify that the time reference has necessarily to be the event time: indeed, if we receive an "error" event and the ingestion stops for 10 minutes (for example because of a network problem) we don't want to trigger any action as, actually, the sensor could have sent an "ok" signal in the meanwhile but we weren't able to consume it yet. With the current punctuator semantic, this use case is impossible to implement. On the other hand, making a punctuator to set the "wake up" trigger with respect to the current event timestamp would do the work. Two notes here: 1) the "current" timestamp event is of course a "best effort" approach as the granularity (and, so, the precision) of the time measurement depends on the granularity of the incoming events. 2) the semantic of a component like that would remain the same no matter if you are in "real time" or "reprocess" situation, preserving the determinism (please, give me a counter-example that could explain why the determinism wouldn't hold). > 2nd punctuation using STREAM_TIME does not respect scheduled interval > - > > Key: KAFKA-13678 > URL: https://issues.apache.org/jira/browse/KAFKA-13678 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.0.0 >Reporter: Lorenzo Cagnatel >Priority: Major > > Scheduling a punctuator using stream time, the first punctuation occurs > immediately as documented, but the second one is not triggered at *t_schedule > + interval* but it could happen before that time. > For example, assume that we schedule a punctuation every 10 sec at timestamp > 5 (t5). The system now works like this: > {noformat} > t5 -> schedule, punctuate, next schedule at t10 > t6 -> no punctuation > t7 -> no punctuation > t8 -> no punctuation > t9 -> no punctuation > t10 -> punctuate, next schedule at t20 > ...{noformat} > In this example the 2nd schedule occurs after 5 seconds from the first one, > breaking the interval duration. > From my point of view, a reasonable behaviour could be: > {noformat} > t5 -> schedule, punctuate, next schedule at t15 > t6 -> no punctuation > t7 -> no punctuation > t8 -> no punctuation > t9 -> no punctuation > t10 -> no punctuation > t11 -> no punctuation > t12 -> no punctuation > t13 -> no punctuation > t14 -> no punctuation > t15 -> punctuate, next schedule at t25 > ...{noformat} > The origin of this problem can be found in {*}StreamTask.schedule{*}: > {code:java} > /** > * Schedules a punctuation for the processor > * > * @param interval the interval in milliseconds > * @param type the punctuation type > * @throws IllegalStateException if the current node is not null > */ > public Cancellable schedule(final long interval, final PunctuationType type, > final Punctuator punctuator) { >switch (type) { > case STREAM_TIME: > // align punctuation to 0L, punctuate as soon as we have data > return schedule(0L, interval, type, punctuator); > case WALL_CLOCK_TIME: > // align punctuation to now, punctuate after interval has elapsed > return schedule(time.milliseconds() + interval, interval, type, > punctuator); > default: > throw new IllegalArgumentException("Unrecognized PunctuationType: " > + type); >} > }{code} > when, in case of stream time, it calls *schedule* with {*}startTime=0{*}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] kowshik commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
kowshik commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r814640948 ## File path: core/src/main/scala/kafka/log/BaseIndex.scala ## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.{Closeable, File} +import java.nio.file.Path + +import org.apache.kafka.common.utils.Utils + +/** + * This class represents a common abstraction for operations like delete and rename of the index files. Review comment: > This class represents a common abstraction for operations like delete and rename of the index files. This class is slim in functionality, and I don't feel there is any real benefit for introducing this. It is not clear to me going forward what operations can be included in this class, and which ones can't be. Are you planning to add new functionality in the future into this class, that, you want to introduce it in this PR? If not, I feel that the earlier design without this base class was simpler. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -623,40 +660,25 @@ abstract class AbstractFetcherThread(name: String, } /** - * Handle the out of range error. Return false if - * 1) the request succeeded or - * 2) was fenced and this thread haven't received new epoch, - * which means we need not backoff and retry. True if there was a retriable error. - */ - private def handleOutOfRangeError(topicPartition: TopicPartition, -fetchState: PartitionFetchState, -requestEpoch: Optional[Integer]): Boolean = { -try { - val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch) - partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) - info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + -s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}") - false -} catch { - case _: FencedLeaderEpochException => -onPartitionFenced(topicPartition, requestEpoch) - - case e @ (_ : UnknownTopicOrPartitionException | -_ : UnknownLeaderEpochException | -_ : NotLeaderOrFollowerException) => -info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}") -true - - case e: Throwable => -error(s"Error getting offset for partition $topicPartition", e) -true -} - } - - /** - * Handle a partition whose offset is out of range and return a new fetch offset. + * It returns the next fetch state. It fetches the log-start-offset or local-log-start-offset based on + * `fetchFromLocalLogStartOffset` flag. This is used in truncation by passing it to the given `truncateAndBuild` + * function. + * + * @param topicPartition topic partition + * @param topicId topic id + * @param currentLeaderEpoch current leader epoch maintained by this follower replica. + * @param truncateAndBuild Function to truncate for the given epoch and offset. It returns the next fetch offset value. + * @param fetchFromLocalLogStartOffset Whether to fetch from local-log-start-offset or log-start-offset. If true, it + * requests the local-log-start-offset from the leader, else it requests + * log-start-offset from the leader. This is used in sending the value to the + * given `truncateAndBuild` function. + * @return */ - protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: TopicPartition, Review comment: This method `fetchOffsetAndApplyTruncateAndBuild` is currently doing a number of things, which is clear from the method name. It will be hard to cover all test cases in unit test.
[GitHub] [kafka] ecararus edited a comment on pull request #11803: KAFKA-13691: Rename target topic to custom name
ecararus edited a comment on pull request #11803: URL: https://github.com/apache/kafka/pull/11803#issuecomment-1050407569 Related to ISSUE : [KAFKA-13691](https://issues.apache.org/jira/browse/KAFKA-13691) When I use mirrormaker it replicates the topics with different name, name of target is built based on sourceClusterAlias + separator + topic implementation is [here](https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java#L49). Proposed improvement consist in: Adding new config property key: target.replication.policy.topic_map E.I. source-> target.replication.policy.class= org.apache.kafka.connect.mirror.RenameTopicReplicationPolicy source->target.replication.policy.separator = . source->target.replication.policy.topic_map= SOURCER_TOPIC_A,DESTINATION_TOPIC_A; SOURCER_TOPIC_B,DESTINATION_TOPIC_B; and add new RenameTopicReplicationPolicy which will be responsible to: 1. override configure method load target.replication.policy.topic_map into a Map where Key is source topic name and value is target topic name 2. override formatRemoteTopic so the target topic name will be loaded based on config @Override public String formatRemoteTopic(String sourceClusterAlias, String topic) { String targetTopic = topicMap.containsKey(topic) ? topicMap.get(topic) : topic ; return super.formatRemoteTopic(sourceClusterAlias,targetTopic); } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12260) PartitionsFor should not return null value
[ https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498036#comment-17498036 ] Bruno Cadonna commented on KAFKA-12260: --- [~calohmn] Thanks for the head-up! > PartitionsFor should not return null value > -- > > Key: KAFKA-12260 > URL: https://issues.apache.org/jira/browse/KAFKA-12260 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Minor > Fix For: 3.0.0 > > > consumer.partitionsFor() could return null value when topic was not found. > This was not properly documented and was error-prone when the return type was > a list. We should fix the logic internally to prevent partitionsFor returning > null result. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12260) PartitionsFor should not return null value
[ https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-12260. --- Resolution: Fixed > PartitionsFor should not return null value > -- > > Key: KAFKA-12260 > URL: https://issues.apache.org/jira/browse/KAFKA-12260 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Minor > Fix For: 3.0.0 > > > consumer.partitionsFor() could return null value when topic was not found. > This was not properly documented and was error-prone when the return type was > a list. We should fix the logic internally to prevent partitionsFor returning > null result. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-12260) PartitionsFor should not return null value
[ https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-12260: -- Fix Version/s: 3.0.0 > PartitionsFor should not return null value > -- > > Key: KAFKA-12260 > URL: https://issues.apache.org/jira/browse/KAFKA-12260 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Minor > Fix For: 3.0.0 > > > consumer.partitionsFor() could return null value when topic was not found. > This was not properly documented and was error-prone when the return type was > a list. We should fix the logic internally to prevent partitionsFor returning > null result. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12260) PartitionsFor should not return null value
[ https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498027#comment-17498027 ] Carsten Lohmann commented on KAFKA-12260: - It looks like this is fixed in Kafka client 3.0.0 and 3.1.0: https://github.com/apache/kafka/commit/e4f2f6f6e82cafbdea785d53521b96fe062e172d > PartitionsFor should not return null value > -- > > Key: KAFKA-12260 > URL: https://issues.apache.org/jira/browse/KAFKA-12260 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Minor > > consumer.partitionsFor() could return null value when topic was not found. > This was not properly documented and was error-prone when the return type was > a list. We should fix the logic internally to prevent partitionsFor returning > null result. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] RivenSun2 commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys
RivenSun2 commented on pull request #11800: URL: https://github.com/apache/kafka/pull/11800#issuecomment-1050676944 @showuon Sorry, I made a mistake, the `map` returned by `config.valuesWithPrefixOverride(prefix)` is not a normal map, it is of type `AbstractConfig.RecordingMap`. `RecordingMap#get(Object key)` will call the `AbstractConfig#ignore(String key)` method to change the `AbstractConfig.used` variable. Forgive my mistakes. However, after `valuesWithPrefixOverride.get("sasl.mechanism")` add `assertFalse(config.unknown().contains("prefix.sasl.mechanism"));`, testCase will fail to verify; because unknownKeys is only affected by originals and values, the value is originalKeys.removeAll(valueKeys). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys
showuon commented on pull request #11800: URL: https://github.com/apache/kafka/pull/11800#issuecomment-1050660740 > In fact, in the testcases we discussed, `config.unused()` and `config.unknown()` will not change after `TestSecurityConfig config` is initialized. The following `config.valuesWithPrefixOverride(prefix)` and `valuesWithPrefixOverride.get()` will not cause the return value of `unused()` and `unknown()` to change Thanks for the explanation. But I don't think it is correct. If we check the original test below in `testValuesWithPrefixOverride` ```java // prefix overrides global assertTrue(config.unused().contains("prefix.sasl.mechanism")); assertTrue(config.unused().contains("sasl.mechanism")); assertEquals("GSSAPI", valuesWithPrefixOverride.get("sasl.mechanism")); assertFalse(config.unused().contains("sasl.mechanism")); assertFalse(config.unused().contains("prefix.sasl.mechanism")); ``` We can see the `unused` was expected to contain those 2 configs. Then after `valuesWithPrefixOverride.get("sasl.mechanism")`, they are used now, that is, not containing in `unused` set. So, what I mean is that since we've changed the 1st assert into `unknown`: ``` assertTrue(config.unknown().contains("prefix.sasl.mechanism")); ``` Theh, we should change the last assert to assert unknown there: ``` assertFalse(config.unknown().contains("prefix.sasl.mechanism")); ``` Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 edited a comment on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys
RivenSun2 edited a comment on pull request #11800: URL: https://github.com/apache/kafka/pull/11800#issuecomment-1050656591 In fact, in the testcases we discussed in `AbstractConfigTest`, `config.unused()` and `config.unknown()` will not change after `TestSecurityConfig config` is initialized. The following `config.valuesWithPrefixOverride(prefix)` and `valuesWithPrefixOverride.get()` will not cause the return value of `unused()` and `unknown()` to change. The main purpose of these testCases is to test whether the Map value returned by AbstractConfig#valuesWithPrefixOverride meets our expectations under different conditions. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org