[GitHub] [kafka] showuon commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys

2022-02-25 Thread GitBox


showuon commented on pull request #11800:
URL: https://github.com/apache/kafka/pull/11800#issuecomment-1051784959


   > unknownKeys contains neither usedKeys nor unusedKeys.
   > Only configKeys that are not officially recognized by kafka will be put 
into unknownKeys.
   
   Yes, this is what I expected. Thanks.
   But that also confuses me: if `unknown` keys doesn't contain `used` and 
`unused`, why this test will fail after we assert `unknown`?
   
   > However, after valuesWithPrefixOverride.get("sasl.mechanism") add 
assertFalse(config.unknown().contains("prefix.sasl.mechanism"));,
   testCase will fail to verify; because unknownKeys is only affected by 
originals and values, the value is originalKeys.removeAll(valueKeys).
   
   In the above test case, we originally expect that `prefix.sasl.mechanism` is 
`used` and is not in `unused` keys. After this PR, we should expected that 
`prefix.sasl.mechanism` is `used`, and not in `unused` and  **NOT** in 
`unknown` keys, right? Did I miss anything here?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13694) Some InvalidRecordException messages are thrown away

2022-02-25 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498411#comment-17498411
 ] 

RivenSun commented on KAFKA-13694:
--

Hi [~guozhang] I looked at the code on the KafkaProducer side again.

In the Sender#failBatch(...) method, when returning data to the user's 
callback, we only return the *PartitionResponse.error* field to the user, and 
do not return the *PartitionResponse.recordErrors* field to the user.



1. The PartitionResponse.errorMessage field cannot solve the current problem.
Sometimes the errorMessage field will not have a value, such as a 
RecordTooLargeException case;
Sometimes its information doesn't tell the user why the send failed, such as in 
this JIRA example, 
PartitionResponse.errorMessage=rve.invalidException.getMessage.

2. We need to fill the PartitionResponse.recordErrors field information into 
the "Exception exception" in the user callback, and may need to add a toString 
method to the ProduceResponse.RecordError class.
In fact, as I said in JIRA, PartitionResponse#toString method has called 
RecordError#toString, but before, RecordError lacks toString method.

What do you think? 
Thanks.

> Some InvalidRecordException messages are thrown away
> 
>
> Key: KAFKA-13694
> URL: https://issues.apache.org/jira/browse/KAFKA-13694
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> 1.Example
> Topic level config:"cleanup.policy":"compact" 
> But when the producer sends the message, the ProducerRecord does not specify 
> the key.
>  
> producer.log
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One 
> or more records have been rejected {code}
>  
>  
> server.log
> {code:java}
> [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: One or more records have been 
> rejected {code}
> Through the logs of the producer and server, we do not know the reason for 
> the failure of sending, only that the message was rejected by the server.
> You can compare the RecordTooLargeException testCase, we can clearly know the 
> reason for the failure from the producer, and the server will not print the 
> log (the reason will be explained later)
> producer_message_too_large.log :
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept.
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept. {code}
> 2.RootCause
> ReplicaManager#appendToLocalLog(...) ->
> Partition#appendRecordsToLeader(...) ->
> UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->
> LogValidator#validateMessagesAndAssignOffsets(...) 
> 1) Analyze the validateMessagesAndAssignOffsets method,
> In the LogValidator#validateRecord method, validateKey and validateTimestamp 
> are called, and the error information of all messages is obtained: 
> Seq[ApiRecordError];
> In the subsequent processRecordErrors(recordErrors) method, currently only 
> special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR 
> returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the 
> code will run to
> {code:java}
> else {
>   throw new RecordValidationException(new InvalidRecordException(
> "One or more records have been rejected"), errors)
> }{code}
> In fact, the *errors* variable here contains the specific information of each 
> recordError, but we did not put the errors information into the message of 
> InvalidRecordException.
> 2).The exception thrown by processRecordErrors will be caught by 
> ReplicaManager#appendToLocalLog(...), we continue to analyze the 
> `catchException code` of appendToLocalLog.
> Here, we can know the RecordTooLargeException, why the server does not print 
> the log.
> Under case rve: RecordValidationException,
> The server prints the log: processFailedRecord method, 
> and sends a response to the client: LogAppendResult method
> In these two methods, we can find that we only use rve.invalidException,
> For rve.recordErrors, the server neither prints it nor returns it to the 
> client.
> 3.Solution
> Two solutions, I prefer the second
> 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns 
> Errors.INVALID_RECORD_WITHOUT_KEY,
> In the processRecordErrors 

[GitHub] [kafka] RivenSun2 commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys

2022-02-25 Thread GitBox


RivenSun2 commented on pull request #11800:
URL: https://github.com/apache/kafka/pull/11800#issuecomment-1051735512


   Hi @showuon 
   Currently the logic of the `unknown()` method is exactly what you would 
expect.
   `unknownKeys=originalKeys.removeAll(valueKeys)`
   
   `unknownKeys` contains neither `usedKeys` nor `unusedKeys`.
   Only configKeys that are not officially recognized by kafka will be put into 
`unknownKeys`.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11813: KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED

2022-02-25 Thread GitBox


ableegoldman commented on a change in pull request #11813:
URL: https://github.com/apache/kafka/pull/11813#discussion_r815270894



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -230,53 +233,72 @@ public RemoveNamedTopologyResult 
removeNamedTopology(final String topologyToRemo
 
 topologyMetadata.unregisterTopology(removeTopologyFuture, 
topologyToRemove);
 
-if (resetOffsets) {
+if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing 
topology") && resetOffsets) {
 log.info("Resetting offsets for the following partitions of {} 
removed NamedTopology {}: {}",
  removeTopologyFuture.isCompletedExceptionally() ? 
"unsuccessfully" : "successfully",
  topologyToRemove, partitionsToReset
 );
-if (!partitionsToReset.isEmpty()) {

Review comment:
   The offset reset code is pretty long so I pulled it out into its own 
method to clean things up a bit

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -230,53 +233,72 @@ public RemoveNamedTopologyResult 
removeNamedTopology(final String topologyToRemo
 
 topologyMetadata.unregisterTopology(removeTopologyFuture, 
topologyToRemove);
 
-if (resetOffsets) {
+if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing 
topology") && resetOffsets) {
 log.info("Resetting offsets for the following partitions of {} 
removed NamedTopology {}: {}",
  removeTopologyFuture.isCompletedExceptionally() ? 
"unsuccessfully" : "successfully",
  topologyToRemove, partitionsToReset
 );
-if (!partitionsToReset.isEmpty()) {
-removeTopologyFuture.whenComplete((v, throwable) -> {
-if (throwable != null) {
-removeTopologyFuture.completeExceptionally(throwable);
-}
-DeleteConsumerGroupOffsetsResult deleteOffsetsResult = 
null;
-while (deleteOffsetsResult == null) {
-try {
-deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-deleteOffsetsResult.all().get();
-} catch (final InterruptedException ex) {
+resetOffsets(removeTopologyFuture, partitionsToReset);
+}
+return new RemoveNamedTopologyResult(removeTopologyFuture);
+}
+
+/**
+ * @return  true iff the application is still in CREATED and the future 
was completed
+ */
+private synchronized boolean completedFutureForUnstartedApp(final 
KafkaFutureImpl updateTopologyFuture,

Review comment:
   This is the main fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11813: KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED

2022-02-25 Thread GitBox


ableegoldman commented on a change in pull request #11813:
URL: https://github.com/apache/kafka/pull/11813#discussion_r815270807



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -100,7 +100,7 @@ public void start(final NamedTopology initialTopology) {
 /**
  * Start up Streams with a collection of initial NamedTopologies (may be 
empty)
  */
-public void start(final Collection initialTopologies) {
+public synchronized void start(final Collection 
initialTopologies) {

Review comment:
   `super.start()` is already synchronized but we should just go ahead and 
synchronize at the first layer

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -145,7 +145,7 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 /**
  * @return the NamedTopology for the specific name, or Optional.empty() if 
the application has no NamedTopology of that name
  */
-public Optional getTopologyByName(final String name) {
+public synchronized Optional getTopologyByName(final String 
name) {

Review comment:
   Should make sure this is thread safe since it's how we check to make 
sure a name isn't already used when trying to add a new topology




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11813: KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED

2022-02-25 Thread GitBox


ableegoldman opened a new pull request #11813:
URL: https://github.com/apache/kafka/pull/11813


   Currently the #add/removeNamedTopology APIs behave a little wonky when the 
application is still in CREATED. Since adding and removing topologies runs some 
validation steps there is valid reason to want to add or remove a topology on a 
dummy app that you don't plan to start, or a real app that you haven't started 
yet. But to actually check the results of the validation you need to call 
`get()` on the future, so we need to make sure that `get()` won't block forever 
in the case of no failure -- as is currently the case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] justinrlee commented on pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured

2022-02-25 Thread GitBox


justinrlee commented on pull request #11811:
URL: https://github.com/apache/kafka/pull/11811#issuecomment-1051595309


   (Also, I'm not sure if we'd want a similar PR to the 3.1 branch)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff

2022-02-25 Thread GitBox


ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r815264270



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being 
processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+private final boolean hasNamedTopologies;
+// map of topologies experiencing errors/currently under backoff
+private final ConcurrentHashMap 
topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+public TaskExecutionMetadata(final Set allTopologyNames) {
+this.hasNamedTopologies = !(allTopologyNames.size() == 1 && 
allTopologyNames.contains(UNNAMED_TOPOLOGY));
+}
+
+public boolean canProcessTask(final Task task, final long now) {
+final String topologyName = task.id().topologyName();
+if (!hasNamedTopologies) {
+// TODO implement error handling/backoff for non-named topologies 
(needs KIP)
+return true;
+} else {
+final NamedTopologyMetadata metadata = 
topologyNameToErrorMetadata.get(topologyName);
+return metadata == null || (metadata.canProcess() && 
metadata.canProcessTask(task, now));
+}
+}
+
+public void registerTaskError(final Task task, final Throwable t, final 
long now) {
+if (hasNamedTopologies) {
+final String topologyName = task.id().topologyName();
+topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new 
NamedTopologyMetadata(topologyName))
+.registerTaskError(task, t, now);
+}
+}
+
+class NamedTopologyMetadata {
+private final Logger log;
+private final Map tasksToErrorTime = new 
ConcurrentHashMap<>();
+
+public NamedTopologyMetadata(final String topologyName) {
+final LogContext logContext = new 
LogContext(String.format("topology-name [%s] ", topologyName));
+this.log = logContext.logger(NamedTopologyMetadata.class);
+}
+
+public boolean canProcess() {
+// TODO: during long task backoffs, pause the full topology to 
avoid it getting out of sync
+return true;
+}
+
+public boolean canProcessTask(final Task task, final long now) {
+// TODO: implement exponential backoff, for now we just wait 15s
+final Long errorTime = tasksToErrorTime.get(task.id());
+if (errorTime == null) {
+return true;
+} else if (now - errorTime > 15000L) {

Review comment:
   Because it was actually taking the thread 10s to come back up (in the 
integration test where we overrode `session.timeout` to 10s) before we had 
https://github.com/apache/kafka/pull/11801
   
   Now with that fix it takes .5 - 4s for the thread to be replaced, so there's 
no particular reason to have it be 15s. I think it makes sense to lower it to 
maybe 5s for now, and then when we have the true exponential backoff obviously 
it can start lower and grow from there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11812: KAFKA-12738: address minor followup and consolidate integration tests of PR #11787

2022-02-25 Thread GitBox


ableegoldman opened a new pull request #11812:
URL: https://github.com/apache/kafka/pull/11812


   This PR addresses the remaining nits from the final review of 
https://github.com/apache/kafka/pull/11787
   
   It also deletes two integration test classes which had only one test in 
them, and moves the tests to another test class file to save on the time to 
bring up an entire embedded kafka cluster just for a single run


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff

2022-02-25 Thread GitBox


ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r815265785



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class ErrorHandlingIntegrationTest {
+
+private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Rule
+public TestName testName = new TestName();
+
+private final String testId = safeUniqueTestName(getClass(), testName);
+private final String appId = "appId_" + testId;
+private final Properties properties = props();
+
+// Task 0
+private final String inputTopic = "input" + testId;
+private final String outputTopic = "output" + testId;
+// Task 1
+private final String errorInputTopic = "error-input" + testId;
+private final String errorOutputTopic = "error-output" + testId;
+
+@Before
+public void setup() {
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, 
errorOutputTopic, inputTopic, outputTopic);
+}
+
+private Properties props() {
+return mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath()),
+mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
+mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class),
+mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class),
+mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1),
+mkEntry(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1)
+)
+);
+}
+
+@Test
+public void shouldBackOffTaskAndEmitDataWithinSameTopology() 

[GitHub] [kafka] justinrlee opened a new pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured

2022-02-25 Thread GitBox


justinrlee opened a new pull request #11811:
URL: https://github.com/apache/kafka/pull/11811


   Single-line change to `build.gradle` to render javadocs for new 
`org.apache.kafka.common.security.oauthbearer.secured` package (part of 
[KIP-768](https://issues.apache.org/jira/browse/KAFKA-13202))
   
   cc @junrao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff

2022-02-25 Thread GitBox


ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r815264270



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being 
processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+private final boolean hasNamedTopologies;
+// map of topologies experiencing errors/currently under backoff
+private final ConcurrentHashMap 
topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+public TaskExecutionMetadata(final Set allTopologyNames) {
+this.hasNamedTopologies = !(allTopologyNames.size() == 1 && 
allTopologyNames.contains(UNNAMED_TOPOLOGY));
+}
+
+public boolean canProcessTask(final Task task, final long now) {
+final String topologyName = task.id().topologyName();
+if (!hasNamedTopologies) {
+// TODO implement error handling/backoff for non-named topologies 
(needs KIP)
+return true;
+} else {
+final NamedTopologyMetadata metadata = 
topologyNameToErrorMetadata.get(topologyName);
+return metadata == null || (metadata.canProcess() && 
metadata.canProcessTask(task, now));
+}
+}
+
+public void registerTaskError(final Task task, final Throwable t, final 
long now) {
+if (hasNamedTopologies) {
+final String topologyName = task.id().topologyName();
+topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new 
NamedTopologyMetadata(topologyName))
+.registerTaskError(task, t, now);
+}
+}
+
+class NamedTopologyMetadata {
+private final Logger log;
+private final Map tasksToErrorTime = new 
ConcurrentHashMap<>();
+
+public NamedTopologyMetadata(final String topologyName) {
+final LogContext logContext = new 
LogContext(String.format("topology-name [%s] ", topologyName));
+this.log = logContext.logger(NamedTopologyMetadata.class);
+}
+
+public boolean canProcess() {
+// TODO: during long task backoffs, pause the full topology to 
avoid it getting out of sync
+return true;
+}
+
+public boolean canProcessTask(final Task task, final long now) {
+// TODO: implement exponential backoff, for now we just wait 15s
+final Long errorTime = tasksToErrorTime.get(task.id());
+if (errorTime == null) {
+return true;
+} else if (now - errorTime > 15000L) {

Review comment:
   Because it was actually taking the thread 10s to come back up (in the 
integration test where we overrode `session.timeout` to 10s) before we had 
https://github.com/apache/kafka/pull/11801
   
   Now with that fix it takes under .5s for the thread to be replaced, so 
there's no particular reason to have it be 15s. I think it makes sense to lower 
it to maybe 5s for now, and then when we have the true exponential backoff 
obviously it can start lower and grow from there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys

2022-02-25 Thread GitBox


showuon commented on pull request #11800:
URL: https://github.com/apache/kafka/pull/11800#issuecomment-1051554030


   > However, after valuesWithPrefixOverride.get("sasl.mechanism") add 
assertFalse(config.unknown().contains("prefix.sasl.mechanism"));,
   testCase will fail to verify; because unknownKeys is only affected by 
originals and values, the value is originalKeys.removeAll(valueKeys).
   
   So, could we also remove `used` config keys for `unknown` configs? 
Otherwise, there would be this strange case that a config is `used`, but 
`unknown` to kafka. 
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13514) Flakey test StickyAssignorTest

2022-02-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498387#comment-17498387
 ] 

Luke Chen edited comment on KAFKA-13514 at 2/26/22, 3:38 AM:
-

[~ableegoldman] ,

I agree with you that if this actually result in the two consumers taking 
ownership of the same partition at the same time, that would be a severe bug, 
and we should fix it soon!

I have this assumption is because log output in this Jira ticket's description. 
However, I also saw this failed test in jenkins after this ticket opened, but 
no similar error logs as this ticket showed. It's just slow. I'm stilling 
waiting to see a failed case and check the complete logs where there is 
assignment error.

> I'm pretty sure we haven't touched the general assignment algorithm

Right, we didn't touch or change the general assignment algorithm. So this bug 
is already there when first written.

 

Do you have the failed test jenkins link for me to check the complete log?

Thank you.


was (Author: showuon):
[~ableegoldman] , 

I agree with you that if this actually result in the two consumers taking 
ownership of the same partition at the same time, that would be a severe bug, 
and we should fix it soon!

I have this assumption is because log output in this Jira ticket's description. 
However, I also saw this error after this ticket opened, and no similar error 
logs as this ticket showed. It's just slow. I'm stilling waiting to see a 
failed case and check the complete logs.

> I'm pretty sure we haven't touched the general assignment algorithm

Right, we didn't touch or change the general assignment algorithm. So this bug 
is already there when first written.

 

Do you have the failed test jenkins link for me to check the complete log?

Thank you.

> Flakey test StickyAssignorTest
> --
>
> Key: KAFKA-13514
> URL: https://issues.apache.org/jira/browse/KAFKA-13514
> Project: Kafka
>  Issue Type: Test
>  Components: clients, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
> No real stack trace, but only:
> {quote}java.util.concurrent.TimeoutException: 
> testLargeAssignmentAndGroupWithNonEqualSubscription() timed out after 60 
> seconds{quote}
> STDOUT
> {quote}[2021-12-07 01:32:23,920] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) 
> [2021-12-07 01:32:58,964] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) 
> [2021-12-07 01:32:58,976] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13514) Flakey test StickyAssignorTest

2022-02-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498387#comment-17498387
 ] 

Luke Chen commented on KAFKA-13514:
---

[~ableegoldman] , 

I agree with you that if this actually result in the two consumers taking 
ownership of the same partition at the same time, that would be a severe bug, 
and we should fix it soon!

I have this assumption is because log output in this Jira ticket's description. 
However, I also saw this error after this ticket opened, and no similar error 
logs as this ticket showed. It's just slow. I'm stilling waiting to see a 
failed case and check the complete logs.

> I'm pretty sure we haven't touched the general assignment algorithm

Right, we didn't touch or change the general assignment algorithm. So this bug 
is already there when first written.

 

Do you have the failed test jenkins link for me to check the complete log?

Thank you.

> Flakey test StickyAssignorTest
> --
>
> Key: KAFKA-13514
> URL: https://issues.apache.org/jira/browse/KAFKA-13514
> Project: Kafka
>  Issue Type: Test
>  Components: clients, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
> No real stack trace, but only:
> {quote}java.util.concurrent.TimeoutException: 
> testLargeAssignmentAndGroupWithNonEqualSubscription() timed out after 60 
> seconds{quote}
> STDOUT
> {quote}[2021-12-07 01:32:23,920] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) 
> [2021-12-07 01:32:58,964] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) 
> [2021-12-07 01:32:58,976] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11788: KAFKA-13673: disable idempotence when config conflicts

2022-02-25 Thread GitBox


showuon commented on a change in pull request #11788:
URL: https://github.com/apache/kafka/pull/11788#discussion_r815260473



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -205,25 +208,31 @@
 /** metric.reporters */
 public static final String METRIC_REPORTER_CLASSES_CONFIG = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
+// max.in.flight.requests.per.connection should be less than or equal to 5 
when idempotence producer enabled to ensure message ordering
+private static final int 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
+
 /** max.in.flight.requests.per.connection */
 public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 
"max.in.flight.requests.per.connection";
 private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = 
"The maximum number of unacknowledged requests the client will send on a single 
connection before blocking."
 + 
" Note that if this config is set to be greater than 1 and 
enable.idempotence is set to false, there is a risk of"
-+ 
" message re-ordering after a failed send due to retries (i.e., if retries are 
enabled).";
-// max.in.flight.requests.per.connection should be less than or equal to 5 
when idempotence producer enabled to ensure message ordering
-private static final int 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
++ 
" message re-ordering after a failed send due to retries (i.e., if retries are 
enabled)."
++ 
" Note additionally that enabling idempotence requires this config value to be 
less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE 
+ "."
++ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled.";
 
 /** retries */
 public static final String RETRIES_CONFIG = 
CommonClientConfigs.RETRIES_CONFIG;
 private static final String RETRIES_DOC = "Setting a value greater than 
zero will cause the client to resend any record whose send fails with a 
potentially transient error."
 + " Note that this retry is no different than if the client resent 
the record upon receiving the error."
-+ " Allowing retries without setting " + 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change 
the"
++ " Allowing retries and disabling enable.idempotence 
but without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to 1 will potentially change the"

Review comment:
   side fix: currently, we said:
   ```
   Allowing retries without setting max.in.flight.requests.per.connection to 1 
will potentially change the ordering of records
   ```
   which is not quite right, because we didn't have the idempotence before.
   Updated to:
   ```
   Allowing retries and disabling enable.idempotence but without setting 
max.in.flight.requests.per.connection to 1 will potentially change the ordering 
of records
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11788: KAFKA-13673: disable idempotence when config conflicts

2022-02-25 Thread GitBox


showuon commented on a change in pull request #11788:
URL: https://github.com/apache/kafka/pull/11788#discussion_r815260093



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -461,27 +467,53 @@ private void 
postProcessAndValidateIdempotenceConfigs(final Map
 final Map originalConfigs = this.originals();
 final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
 configs.put(ACKS_CONFIG, acksStr);
-
-// For idempotence producers, values for `RETRIES_CONFIG` and 
`ACKS_CONFIG` need validation
-if (idempotenceEnabled()) {
-boolean userConfiguredRetries = 
originalConfigs.containsKey(RETRIES_CONFIG);
-if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
-throw new ConfigException("Must set " + 
ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent 
producer.");
+final boolean userConfiguredIdempotence = 
this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+boolean idempotenceEnabled = 
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+boolean shouldDisableIdempotence = false;
+
+// For idempotence producers, values for `retries` and `acks` and 
`max.in.flight.requests.per.connection` need validation
+if (idempotenceEnabled) {
+final int retries = this.getInt(RETRIES_CONFIG);
+if (retries == 0) {
+if (userConfiguredIdempotence) {
+throw new ConfigException("Must set " + RETRIES_CONFIG + " 
to non-zero when using the idempotent producer.");
+}
+log.info("Idempotence will be disabled because {} is set to 
0.", RETRIES_CONFIG, retries);
+shouldDisableIdempotence = true;
 }
 
-boolean userConfiguredAcks = 
originalConfigs.containsKey(ACKS_CONFIG);
 final short acks = Short.valueOf(acksStr);
-if (userConfiguredAcks && acks != (short) -1) {
-throw new ConfigException("Must set " + ACKS_CONFIG + " to all 
in order to use the idempotent " +
+if (acks != (short) -1) {
+if (userConfiguredIdempotence) {
+throw new ConfigException("Must set " + ACKS_CONFIG + " to 
all in order to use the idempotent " +
 "producer. Otherwise we cannot guarantee 
idempotence.");
+}
+log.info("Idempotence will be disabled because {} is set to 
{}, not set to 'all'.", ACKS_CONFIG, acks);
+shouldDisableIdempotence = true;
 }
 
-boolean userConfiguredInflightRequests = 
originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-if (userConfiguredInflightRequests && 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < 
this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
-throw new ConfigException("Must set " + 
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
+final int inFlightConnection = 
this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < 
inFlightConnection) {
+if (userConfiguredIdempotence) {
+throw new ConfigException("Must set " + 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
 " to use the idempotent producer.");
+}
+log.warn("Idempotence will be disabled because {} is set to 
{}, which is greater than 5. " +
+"Please note that in v4.0.0 and onward, this will become 
an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection);
+shouldDisableIdempotence = true;
 }
 }
+
+if (shouldDisableIdempotence) {
+configs.put(ENABLE_IDEMPOTENCE_CONFIG, false);
+}
+
+// validate `transaction.id` after validating idempotence dependant 
configs because `enable.idempotence` config might be overridden
+idempotenceEnabled = idempotenceEnabled && !shouldDisableIdempotence;

Review comment:
   Good suggestion! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11810: KAFKA-13281: can remove topologies while in a created state

2022-02-25 Thread GitBox


ableegoldman commented on pull request #11810:
URL: https://github.com/apache/kafka/pull/11810#issuecomment-1051526955


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11810: KAFKA-13281: can remove topologies while in a created state

2022-02-25 Thread GitBox


ableegoldman merged pull request #11810:
URL: https://github.com/apache/kafka/pull/11810


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11810: KAFKA-13281: can remove topologies while in a created state

2022-02-25 Thread GitBox


ableegoldman commented on pull request #11810:
URL: https://github.com/apache/kafka/pull/11810#issuecomment-1051525618


   All test failures are unrelated, merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13514) Flakey test StickyAssignorTest

2022-02-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498386#comment-17498386
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13514:


Hey [~showuon] just checking in since I saw this fail again, you're saying 
there's a bug in the generalAssign case that's causing it to assign the same 
partition to more than one consumer? If so, what's happening in this case – do 
we actually catch this bug somewhere in the assignor or coordinator, or would 
this actually result in the two consumers taking ownership of the same 
partition at the same time?

If the latter, that's a pretty severe bug and we should absolutely be trying to 
get a fix in possibly as a blocker for the next release (of course AFAICT the 
general case where consumers have unequal subscriptions is extremely rare, but 
it's still a correctness bug and thus critical).

Do you have any sense of whether this was a newly introduced bug or has it been 
there since the sticky assignment algorithm was first written? I'm pretty sure 
we haven't touched the general assignment algorithm so my guess it was always 
there, and thus maybe not a blocker, but just wondering

> Flakey test StickyAssignorTest
> --
>
> Key: KAFKA-13514
> URL: https://issues.apache.org/jira/browse/KAFKA-13514
> Project: Kafka
>  Issue Type: Test
>  Components: clients, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
> No real stack trace, but only:
> {quote}java.util.concurrent.TimeoutException: 
> testLargeAssignmentAndGroupWithNonEqualSubscription() timed out after 60 
> seconds{quote}
> STDOUT
> {quote}[2021-12-07 01:32:23,920] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) 
> [2021-12-07 01:32:58,964] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150) 
> [2021-12-07 01:32:58,976] ERROR Found multiple consumers consumer1 and 
> consumer2 claiming the same TopicPartition topic-0 in the same generation -1, 
> this will be invalidated and removed from their previous assignment. 
> (org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor:150){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman merged pull request #11808: KAFKA-13281: list all named topologies

2022-02-25 Thread GitBox


ableegoldman merged pull request #11808:
URL: https://github.com/apache/kafka/pull/11808


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lihaosky commented on pull request #11804: KAFKA-13542: add rebalance reason in Kafka Streams

2022-02-25 Thread GitBox


lihaosky commented on pull request #11804:
URL: https://github.com/apache/kafka/pull/11804#issuecomment-1051403295


   Hi @showuon , I saw lots of checks failed here and in other merged PRs. Is 
`tests/Build/ARM` the only relevant test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13694) Some InvalidRecordException messages are thrown away

2022-02-25 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498369#comment-17498369
 ] 

RivenSun commented on KAFKA-13694:
--

Hi @guozhang wang, 

I went through the code again. I agree with you that recordErrors are already 
stored in the ProducerResponse.
It is very likely that when the KafkaProducer side processes the 
producerResponse, it does not return recordErrors to the user's Callback.

I went out today, and I will analyze the reason further later.
thanks.

> Some InvalidRecordException messages are thrown away
> 
>
> Key: KAFKA-13694
> URL: https://issues.apache.org/jira/browse/KAFKA-13694
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> 1.Example
> Topic level config:"cleanup.policy":"compact" 
> But when the producer sends the message, the ProducerRecord does not specify 
> the key.
>  
> producer.log
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One 
> or more records have been rejected {code}
>  
>  
> server.log
> {code:java}
> [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: One or more records have been 
> rejected {code}
> Through the logs of the producer and server, we do not know the reason for 
> the failure of sending, only that the message was rejected by the server.
> You can compare the RecordTooLargeException testCase, we can clearly know the 
> reason for the failure from the producer, and the server will not print the 
> log (the reason will be explained later)
> producer_message_too_large.log :
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept.
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept. {code}
> 2.RootCause
> ReplicaManager#appendToLocalLog(...) ->
> Partition#appendRecordsToLeader(...) ->
> UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->
> LogValidator#validateMessagesAndAssignOffsets(...) 
> 1) Analyze the validateMessagesAndAssignOffsets method,
> In the LogValidator#validateRecord method, validateKey and validateTimestamp 
> are called, and the error information of all messages is obtained: 
> Seq[ApiRecordError];
> In the subsequent processRecordErrors(recordErrors) method, currently only 
> special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR 
> returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the 
> code will run to
> {code:java}
> else {
>   throw new RecordValidationException(new InvalidRecordException(
> "One or more records have been rejected"), errors)
> }{code}
> In fact, the *errors* variable here contains the specific information of each 
> recordError, but we did not put the errors information into the message of 
> InvalidRecordException.
> 2).The exception thrown by processRecordErrors will be caught by 
> ReplicaManager#appendToLocalLog(...), we continue to analyze the 
> `catchException code` of appendToLocalLog.
> Here, we can know the RecordTooLargeException, why the server does not print 
> the log.
> Under case rve: RecordValidationException,
> The server prints the log: processFailedRecord method, 
> and sends a response to the client: LogAppendResult method
> In these two methods, we can find that we only use rve.invalidException,
> For rve.recordErrors, the server neither prints it nor returns it to the 
> client.
> 3.Solution
> Two solutions, I prefer the second
> 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns 
> Errors.INVALID_RECORD_WITHOUT_KEY,
> In the processRecordErrors method, also do special processing for 
> Errors.INVALID_RECORD_WITHOUT_KEY
> 2)Modify the logic of the processRecordErrors method, no longer distinguish 
> the types of Errors, and {*}Even if new INVALID_RECORD types will be added in 
> the future{*}, we uniformly return:
> {code:java}
> throw new RecordValidationException(new InvalidRecordException(
>   "One or more records have been rejected due to " + errors.toString()), 
> errors) {code}
> Also need to add toString() method for ProduceResponse.RecordError class
> {code:java}
> @Override
> public String toString() {
> return "RecordError("
> + "batchIndex=" + batchIndex
> + ", 

[GitHub] [kafka] wcarlson5 opened a new pull request #11810: KAFKA-13281: can remove topologies while in a created state

2022-02-25 Thread GitBox


wcarlson5 opened a new pull request #11810:
URL: https://github.com/apache/kafka/pull/11810


   We should be able to change the topologies in a created state. This should 
include removing them.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13694) Some InvalidRecordException messages are thrown away

2022-02-25 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498364#comment-17498364
 ] 

RivenSun commented on KAFKA-13694:
--

Hi [~guozhang] 

Thank you for your reply.
The KafkaProducer code I tested locally is as follows:
{code:java}
ProducerRecord record = new ProducerRecord("rivenTest4",
System.currentTimeMillis() + value);
Callback callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {

if (e != null)
LOG.error("the producer has a error:" + e.getMessage());
else {
System.out.println("The offset of the record we just sent is: " + 
metadata.offset());
System.out.println("The partition of the record we just sent is: " 
+ metadata.partition());
}

}
};
producer.send(record, callback); {code}
 

The kafkaProducer does not know the specific reason for the failure to send.

producer.log
{code:java}
[kafka-producer-network-thread | producer-1] ERROR 
us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One or 
more records have been rejected {code}
{code:java}
 {code}


You can read the source code of the catchException at the end of the 
ReplicaManager#appendToLocalLog(...) method
{code:java}
catch {
  // NOTE: Failed produce requests metric is not incremented for known 
exceptions
  // it is supposed to indicate un-expected failures of a broker in handling a 
produce request
  case e@ (_: UnknownTopicOrPartitionException |
   _: NotLeaderOrFollowerException |
   _: RecordTooLargeException |
   _: RecordBatchTooLargeException |
   _: CorruptRecordException |
   _: KafkaStorageException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, 
Some(e)))
  case rve: RecordValidationException =>
val logStartOffset = processFailedRecord(topicPartition, 
rve.invalidException)
val recordErrors = rve.recordErrors
(topicPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
  logStartOffset, recordErrors, rve.invalidException.getMessage), 
Some(rve.invalidException)))
  case t: Throwable =>
val logStartOffset = processFailedRecord(topicPartition, t)
(topicPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
 Some(t)))
} {code}
In case rve: RecordValidationException, the server only uses 
rve.invalidException when printing the log and *filling the exception* field in 
LogAppendResult. I understand that rve.recordErrors is indeed used when 
constructing the LogAppendInfo field of LogAppendResult, but KafkaClient does 
not perceive the recordErrors of LogAppendInfo. 

You can refer to the following code
{code:java}
case e@ (_: UnknownTopicOrPartitionException |
 _: NotLeaderOrFollowerException |
 _: RecordTooLargeException |
 _: RecordBatchTooLargeException |
 _: CorruptRecordException |
 _: KafkaStorageException) =>
  (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, 
Some(e))) {code}
In this code, the server does not pass in the recordErrors parameter when 
constructing LogAppendInfo.UnknownLogAppendInfo, but the producer can still 
know the specific failure reason.
producer2.log
{code:java}
[kafka-producer-network-thread | producer-1] ERROR 
us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
request included a message larger than the max message size the server will 
accept. {code}
 

 

> Some InvalidRecordException messages are thrown away
> 
>
> Key: KAFKA-13694
> URL: https://issues.apache.org/jira/browse/KAFKA-13694
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> 1.Example
> Topic level config:"cleanup.policy":"compact" 
> But when the producer sends the message, the ProducerRecord does not specify 
> the key.
>  
> producer.log
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One 
> or more records have been rejected {code}
>  
>  
> server.log
> {code:java}
> [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: One or more records have been 
> rejected {code}
> Through the logs of the producer and server, we do not know the reason for 
> the failure of sending, only that the message was rejected by the server.
> You can compare the RecordTooLargeException testCase, we can clearly know the 
> reason for the failure from the producer, and the server will not print 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #11808: KAFKA-13281: list all named topologies

2022-02-25 Thread GitBox


wcarlson5 commented on a change in pull request #11808:
URL: https://github.com/apache/kafka/pull/11808#discussion_r815227626



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -149,6 +149,10 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 return 
Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology);
 }
 
+public Collection getAllTopologies() {
+return getAllTopologies();

Review comment:
   oops. I though I was calling that on topology metadata. Fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL

2022-02-25 Thread GitBox


cmccabe merged pull request #11806:
URL: https://github.com/apache/kafka/pull/11806


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #11809: MINOR: create KafkaConfigSchema and TimelineObject

2022-02-25 Thread GitBox


cmccabe opened a new pull request #11809:
URL: https://github.com/apache/kafka/pull/11809


   Create KafkaConfigSchema to encapsulate the concept of determining the types 
of configuration keys.
   This is useful in the controller because we can't import KafkaConfig, which 
is part of core. Also
   introduce the TimelineObject class, which is a more generic version of 
TimelineInteger /
   TimelineLong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11808: KAFKA-13281: list all named topologies

2022-02-25 Thread GitBox


ableegoldman commented on a change in pull request #11808:
URL: https://github.com/apache/kafka/pull/11808#discussion_r815205312



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -149,6 +149,10 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 return 
Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology);
 }
 
+public Collection getAllTopologies() {
+return getAllTopologies();

Review comment:
   this looks like an infinite loop?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11807: KAFKA-13698; KRaft authorizer should use host address instead of name

2022-02-25 Thread GitBox


cmccabe commented on pull request #11807:
URL: https://github.com/apache/kafka/pull/11807#issuecomment-1051328712


   @hachikuji : looks like `AuthorizerIntegrationTest#testHostAddressBasedAcls` 
needs to be fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11807: KAFKA-13698; KRaft authorizer should use host address instead of name

2022-02-25 Thread GitBox


cmccabe commented on pull request #11807:
URL: https://github.com/apache/kafka/pull/11807#issuecomment-1051328911


   ```
   [2022-02-25T22:07:29.188Z] AuthorizerIntegrationTest > 
testHostAddressBasedAcls(String) > 
kafka.api.AuthorizerIntegrationTest.testHostAddressBasedAcls(String)[2] FAILED
   [2022-02-25T22:07:29.188Z] org.opentest4j.AssertionFailedError: 
expected:  but was: 
   [2022-02-25T22:07:29.188Z] at 
app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
   [2022-02-25T22:07:29.188Z] at 
app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
   [2022-02-25T22:07:29.188Z] at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
   [2022-02-25T22:07:29.188Z] at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
   [2022-02-25T22:07:29.188Z] at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
   [2022-02-25T22:07:29.188Z] at 
app//kafka.api.AuthorizerIntegrationTest.testHostAddressBasedAcls(AuthorizerIntegrationTest.scala:2434)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #11808: KAFKA-13281: list all named topologies

2022-02-25 Thread GitBox


wcarlson5 opened a new pull request #11808:
URL: https://github.com/apache/kafka/pull/11808


   List all the named topologies that have been added to this client
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815182457



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+@Mock

Review comment:
   oh good call - the mock exist because I was doing something else.  
Deleting this line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #11807: KAFKA-13698; KRaft authorizer should use host address instead of name

2022-02-25 Thread GitBox


hachikuji commented on pull request #11807:
URL: https://github.com/apache/kafka/pull/11807#issuecomment-1051297524


   New integration test is not working on jenkins. I was afraid of that. I'll 
have to see if there's a better approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815174043



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+/**
+ * The method executes the callable at least once, optionally retrying the 
callable if
+ * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If all retries are exhausted,
+ * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+ *
+ * If maxRetries is set to 0, the task will be
+ * executed exactly once.  If maxRetries is set to 
n, the callable will be executed at
+ * most n + 1 times.
+ *
+ * If retryBackoffMs is set to 0, no wait will happen in 
between the retries.
+ *
+ * @param callable the function to execute.
+ * @param maxRetries maximum number of retries; must be 0 or more
+ * @param retryBackoffMs the number of milliseconds to delay upon 
receiving a
+ * {@link org.apache.kafka.connect.errors.RetriableException} before 
retrying again; must be 0 or more
+ *

Review comment:
   Nit on the spacing so the description of parameters is column-aligned.
   ```suggestion
* @param callable   the function to execute.
* @param maxRetries maximum number of retries; must be 0 or more
* @param retryBackoffMs the number of milliseconds to delay upon 
receiving a
*   {@link 
org.apache.kafka.connect.errors.RetriableException} before retrying again; 
*   must be 0 or more
   ```

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+@Mock

Review comment:
   Do we need this line? I think not since you're instantiating 
`mockCallable` in `setUp()`. The annotation is really only needed when the 
Mockito JUnit runner is used to initialize the mock fields.
   
   Also, IIRC we get rid of the `@RunWith(PowerMockRunner.class)` line as well, 
since this code is not really using anything from PowerMock. It'd be great if 
we could avoid using PowerMock in new code.




-- 
This is an automated message from the Apache Git 

[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815158301



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+/**
+ * The method executes the callable, and performs retries if
+ * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+ * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+ * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+ *
+ * The callable task will be executed at least once.  If 
maxRetries is set to 0, the task will be
+ * executed exactly once.  If maxRetries is set to 
n, the callable will be executed at
+ * most n + 1 times.
+ *
+ * If retryBackoffMs is set to 0, no wait will happen in 
between the retries.
+ *
+ * @param callable The task to execute.
+ * @param maxRetries Maximum number of retries.
+ * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+ * {@link org.apache.kafka.connect.errors.RetriableException}.
+ *
+ * @throws ConnectException If the task exhausted all the retries.
+ */
+public static  T retry(Callable callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+Throwable lastError = null;
+int attempt = 0;
+final long maxAttempts = maxRetries + 1;
+while (++attempt <= maxAttempts) {
+try {
+return callable.call();
+} catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+"Reason: {}", attempt, maxRetries - attempt, 
e.getMessage());
+lastError = e;
+} catch (WakeupException e) {
+lastError = e;
+} catch (Exception e) {
+log.warn("Non-retriable exception caught. Re-throwing. Reason: 
{}, {}", e.getClass(), e.getMessage());
+throw e;
+}
+Utils.sleep(retryBackoffMs);

Review comment:
   Yeah, I think it's worth the bit of logic to fail more quickly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815158013



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+/**
+ * The method executes the callable, and performs retries if
+ * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+ * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+ * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+ *
+ * The callable task will be executed at least once.  If 
maxRetries is set to 0, the task will be
+ * executed exactly once.  If maxRetries is set to 
n, the callable will be executed at
+ * most n + 1 times.
+ *
+ * If retryBackoffMs is set to 0, no wait will happen in 
between the retries.
+ *
+ * @param callable The task to execute.
+ * @param maxRetries Maximum number of retries.
+ * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+ * {@link org.apache.kafka.connect.errors.RetriableException}.
+ *
+ * @throws ConnectException If the task exhausted all the retries.
+ */
+public static  T retry(Callable callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+Throwable lastError = null;
+int attempt = 0;
+final long maxAttempts = maxRetries + 1;
+while (++attempt <= maxAttempts) {
+try {
+return callable.call();
+} catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+"Reason: {}", attempt, maxRetries - attempt, 
e.getMessage());

Review comment:
   We probably should log the exception, in fact.  making the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815156025



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+/**
+ * The method executes the callable, and performs retries if
+ * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+ * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+ * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+ *
+ * The callable task will be executed at least once.  If 
maxRetries is set to 0, the task will be
+ * executed exactly once.  If maxRetries is set to 
n, the callable will be executed at
+ * most n + 1 times.
+ *
+ * If retryBackoffMs is set to 0, no wait will happen in 
between the retries.
+ *
+ * @param callable The task to execute.
+ * @param maxRetries Maximum number of retries.
+ * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+ * {@link org.apache.kafka.connect.errors.RetriableException}.
+ *
+ * @throws ConnectException If the task exhausted all the retries.
+ */
+public static  T retry(Callable callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+Throwable lastError = null;
+int attempt = 0;
+final long maxAttempts = maxRetries + 1;
+while (++attempt <= maxAttempts) {
+try {
+return callable.call();
+} catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+"Reason: {}", attempt, maxRetries - attempt, 
e.getMessage());
+lastError = e;
+} catch (WakeupException e) {
+lastError = e;
+} catch (Exception e) {
+log.warn("Non-retriable exception caught. Re-throwing. Reason: 
{}, {}", e.getClass(), e.getMessage());
+throw e;
+}
+Utils.sleep(retryBackoffMs);

Review comment:
   Agreed, and I actually had the exact same though - though, i thought it 
would be fine to wait for an additional x-amount of time.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815154741



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+/**
+ * The method executes the callable, and performs retries if
+ * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+ * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+ * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+ *
+ * The callable task will be executed at least once.  If 
maxRetries is set to 0, the task will be
+ * executed exactly once.  If maxRetries is set to 
n, the callable will be executed at
+ * most n + 1 times.
+ *
+ * If retryBackoffMs is set to 0, no wait will happen in 
between the retries.
+ *
+ * @param callable The task to execute.
+ * @param maxRetries Maximum number of retries.
+ * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+ * {@link org.apache.kafka.connect.errors.RetriableException}.
+ *
+ * @throws ConnectException If the task exhausted all the retries.
+ */
+public static  T retry(Callable callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+Throwable lastError = null;
+int attempt = 0;
+final long maxAttempts = maxRetries + 1;
+while (++attempt <= maxAttempts) {
+try {
+return callable.call();
+} catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+"Reason: {}", attempt, maxRetries - attempt, 
e.getMessage());
+lastError = e;
+} catch (WakeupException e) {
+lastError = e;
+} catch (Exception e) {
+log.warn("Non-retriable exception caught. Re-throwing. Reason: 
{}, {}", e.getClass(), e.getMessage());
+throw e;
+}
+Utils.sleep(retryBackoffMs);
+}
+
+throw new ConnectException("Fail to retry the task after " + 
maxRetries + " attempts.  Reason: " + lastError, lastError);

Review comment:
   Agreed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815104923



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
 }
 }
 
-private void readToLogEnd() {
+/**
+ * This method finds the end offsets using the listOffsets 
method of the admin client.
+ * As the listOffsets method might throw a {@link 
RetriableException}, the shouldRetry
+ * flag enables retry, upon catching such exception, if it is set to 
True.
+ *
+ * @param shouldRetry Boolean flag to enable retry for the admin client 
listOffsets call.

Review comment:
   Maybe add:
   ```suggestion
* @param shouldRetry Boolean flag to enable retry for the admin client 
listOffsets call.
* @see TopicAdmin#retryEndOffsets
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
 }
 }
 
-private void readToLogEnd() {
+/**
+ * This method finds the end offsets using the listOffsets 
method of the admin client.
+ * As the listOffsets method might throw a {@link 
RetriableException}, the shouldRetry
+ * flag enables retry, upon catching such exception, if it is set to 
True.
+ *
+ * @param shouldRetry Boolean flag to enable retry for the admin client 
listOffsets call.
+ */
+
+private void readToLogEnd(boolean shouldRetry) {
 Set assignment = consumer.assignment();
-Map endOffsets = readEndOffsets(assignment);
+// it will subsequently invoke the listOffsets call here

Review comment:
   I don't think this comment adds much value, especially because 
"subsequently" is ambiguous. IMO the method call stands on its own.
   ```suggestion
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+/**
+ * The method executes the callable, and performs retries if
+ * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+ * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+ * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+ *
+ * The callable task will be executed at least once.  If 
maxRetries is set to 0, the task will be
+ * executed exactly once.  If maxRetries is set to 
n, the callable will be executed at
+ * most n + 1 times.
+ *
+ * If retryBackoffMs is set to 0, no wait will happen in 
between the retries.
+ *
+ * @param callable The task to execute.
+ * @param maxRetries Maximum number of retries.
+ * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+ * {@link org.apache.kafka.connect.errors.RetriableException}.
+ *
+ * @throws ConnectException If the task exhausted all the retries.
+ */
+public static  T retry(Callable callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+Throwable lastError = null;
+int attempt = 0;
+final long maxAttempts = maxRetries + 1;
+while (++attempt <= maxAttempts) {
+try {
+return callable.call();
+} catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+ 

[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

2022-02-25 Thread GitBox


C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814970233



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##
@@ -57,15 +71,46 @@
 private final Herder herder;
 private final List connectorPlugins;
 
-private static final List> CONNECTOR_EXCLUDES = 
Arrays.asList(
+static final List> CONNECTOR_EXCLUDES = 
Arrays.asList(
 VerifiableSourceConnector.class, VerifiableSinkConnector.class,
 MockConnector.class, MockSourceConnector.class, 
MockSinkConnector.class,
 SchemaSourceConnector.class
 );
 
+@SuppressWarnings("rawtypes")
+static final List> TRANSFORM_EXCLUDES = 
Collections.singletonList(
+PredicatedTransformation.class
+);
+
 public ConnectorPluginsResource(Herder herder) {
 this.herder = herder;
 this.connectorPlugins = new ArrayList<>();
+
+// TODO: improve once plugins are allowed to be added/removed during 
runtime.
+for (PluginDesc plugin : 
herder.plugins().sinkConnectors()) {
+if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+connectorPlugins.add(new ConnectorPluginInfo(plugin));
+}
+}
+for (PluginDesc plugin : 
herder.plugins().sourceConnectors()) {
+if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+connectorPlugins.add(new ConnectorPluginInfo(plugin));
+}
+}
+for (PluginDesc> transform : 
herder.plugins().transformations()) {
+if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+connectorPlugins.add(new ConnectorPluginInfo(transform));
+}
+}
+for (PluginDesc> predicate : 
herder.plugins().predicates()) {
+connectorPlugins.add(new ConnectorPluginInfo(predicate));
+}
+for (PluginDesc converter : herder.plugins().converters()) {
+connectorPlugins.add(new ConnectorPluginInfo(converter));
+}
+for (PluginDesc headerConverter : 
herder.plugins().headerConverters()) {
+connectorPlugins.add(new ConnectorPluginInfo(headerConverter));
+}

Review comment:
   Now that we have separate `Plugins::sinkConnectors` and 
`Plugins::sourceConnectors` methods, we can abstract this a little, which 
should improve readability a bit and make it easier to extend for other plugin 
types in the future:
   ```suggestion
   static final List> 
SINK_CONNECTOR_EXCLUDES = Arrays.asList(
   VerifiableSinkConnector.class, MockSinkConnector.class
   );
   
   static final List> 
SOURCE_CONNECTOR_EXCLUDES = Arrays.asList(
   VerifiableSourceConnector.class, MockSourceConnector.class, 
SchemaSourceConnector.class
   );
   
   @SuppressWarnings({"unchecked", "rawtypes"})
   static final List>> TRANSFORM_EXCLUDES 
= Collections.singletonList(
   (Class) PredicatedTransformation.class
   );
   
   public ConnectorPluginsResource(Herder herder) {
   this.herder = herder;
   this.connectorPlugins = new ArrayList<>();
   
   // TODO: improve once plugins are allowed to be added/removed during 
runtime.
   addConnectorPlugins(herder.plugins().sinkConnectors(), 
SINK_CONNECTOR_EXCLUDES);
   addConnectorPlugins(herder.plugins().sourceConnectors(), 
SOURCE_CONNECTOR_EXCLUDES);
   addConnectorPlugins(herder.plugins().transformations(), 
TRANSFORM_EXCLUDES);
   addConnectorPlugins(herder.plugins().predicates(), 
Collections.emptySet());
   addConnectorPlugins(herder.plugins().converters(), 
Collections.emptySet());
   addConnectorPlugins(herder.plugins().headerConverters(), 
Collections.emptySet());
   }
   
   private  void addConnectorPlugins(Collection> plugins, 
Collection> excludes) {
   plugins.stream()
   .filter(p -> !excludes.contains(p.pluginClass()))
   .map(ConnectorPluginInfo::new)
   .forEach(connectorPlugins::add);
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
 return keys;
 }
 
+@Override
+public List connectorPluginConfig(String pluginName) {
+List results = new ArrayList<>();
+ConfigDef configDefs;
+try {
+Plugins p = plugins();
+Object plugin = p.newPlugin(pluginName);
+PluginType pluginType = PluginType.from(plugin.getClass());
+switch (pluginType) {
+case SINK:
+case SOURCE:
+configDefs = ((Connector) plugin).config();
+break;
+case CONVERTER:
+

[GitHub] [kafka] hachikuji opened a new pull request #11807: KAFKA-13698; KRaft authorizer should use host address instead of name

2022-02-25 Thread GitBox


hachikuji opened a new pull request #11807:
URL: https://github.com/apache/kafka/pull/11807


   Use `InetAddress.getHostAddress` in `StandardAuthorizer` instead of 
`InetAddress.getHostName`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13698) KRaft authorizer should check host address instead of name

2022-02-25 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13698:
---

 Summary: KRaft authorizer should check host address instead of name
 Key: KAFKA-13698
 URL: https://issues.apache.org/jira/browse/KAFKA-13698
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


StandardAuthorizer currently uses `InetAddress.getHostName` to validate hosts 
specified by ACLs. It should use `InetAddress.getHostAddress` instead as we do 
in `AclAuthorizer`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815023731



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -319,9 +319,11 @@ private void poll(long timeoutMs) {
 }
 }
 
-private void readToLogEnd() {
+private void readToLogEnd(boolean shouldRetry) {
 Set assignment = consumer.assignment();
-Map endOffsets = readEndOffsets(assignment);
+// readEndOffsets makes listOffsets call to adminClient, if 
shouldRetry is set to True, the adminClinet
+// will retry on RetriableExceptions

Review comment:
   Yes! Good call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-02-25 Thread GitBox


rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815017009



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+/**
+ * The method executes the callable, and performs retries if
+ * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+ * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+ * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+ *
+ * The callable task will be executed at least once.  If 
maxRetries is set to 0, the task will be
+ * executed exactly once.  If maxRetries is set to 
n, the callable will be executed at
+ * most n + 1 times.
+ *
+ * If retryBackoffMs is set to 0, no wait will happen in 
between the retries.
+ *
+ * @param callable The task to execute.
+ * @param maxRetries Maximum number of retries.
+ * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+ * {@link org.apache.kafka.connect.errors.RetriableException}.
+ *
+ * @throws ConnectException If the task exhausted all the retries.
+ */
+public static  T retry(Callable callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+Throwable lastError = null;
+int attempt = 0;
+long maxAttempts = maxRetries + 1;
+while (attempt++ < maxAttempts) {

Review comment:
   Should this be:
   ```suggestion
   while (++attempt <= maxAttempts) {
   ```
   so we could use a 1-based attempt in the log message on line 58-59?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+/**
+ * The method executes the callable, and performs retries if
+ * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+ * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+ * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+ *
+ * The callable task will be executed at least once.  If 
maxRetries is set to 0, the task will be
+ * 

[GitHub] [kafka] hachikuji commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL

2022-02-25 Thread GitBox


hachikuji commented on a change in pull request #11806:
URL: https://github.com/apache/kafka/pull/11806#discussion_r815011402



##
File path: 
metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
##
@@ -250,14 +253,57 @@ public void testSimpleAuthorizations() throws Exception {
 withId(newBarAcl(ALTER_CONFIGS, ALLOW)));
 fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
 barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
-assertEquals(Collections.singletonList(ALLOWED),
+assertEquals(singletonList(ALLOWED),
 authorizer.authorize(new MockAuthorizableRequestContext.Builder().
 setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
-Collections.singletonList(newAction(READ, TOPIC, 
"foo_";
-assertEquals(Collections.singletonList(ALLOWED),
+singletonList(newAction(READ, TOPIC, "foo_";
+assertEquals(singletonList(ALLOWED),
 authorizer.authorize(new MockAuthorizableRequestContext.Builder().
 setPrincipal(new KafkaPrincipal(USER_TYPE, 
"fred")).build(),
-Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, 
"bar";
+singletonList(newAction(ALTER_CONFIGS, GROUP, "bar";
+}
+
+@Test
+public void testTopicAclWithOperationAll() throws Exception {
+StandardAuthorizer authorizer = new StandardAuthorizer();
+authorizer.configure(Collections.emptyMap());
+List acls = Arrays.asList(
+new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW),
+new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, 
ALLOW),
+new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW)
+);
+
+acls.forEach(acl -> {
+StandardAclWithId aclWithId = withId(acl);
+authorizer.addAcl(aclWithId.id(), aclWithId.acl());
+});
+
+assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), 
authorizer.authorize(
+newRequestContext("alice"),
+Arrays.asList(
+newAction(WRITE, TOPIC, "foo"),
+newAction(DESCRIBE_CONFIGS, TOPIC, "bar"),
+newAction(DESCRIBE, TOPIC, "baz";
+
+assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), 
authorizer.authorize(
+newRequestContext("bob"),
+Arrays.asList(
+newAction(WRITE, TOPIC, "foo"),
+newAction(READ, TOPIC, "bar"),
+newAction(DESCRIBE, TOPIC, "baz";

Review comment:
   Sorry, I think I had this as a DENY rule at one point. Will fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL

2022-02-25 Thread GitBox


cmccabe commented on a change in pull request #11806:
URL: https://github.com/apache/kafka/pull/11806#discussion_r815011139



##
File path: 
metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
##
@@ -250,14 +253,57 @@ public void testSimpleAuthorizations() throws Exception {
 withId(newBarAcl(ALTER_CONFIGS, ALLOW)));
 fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
 barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
-assertEquals(Collections.singletonList(ALLOWED),
+assertEquals(singletonList(ALLOWED),
 authorizer.authorize(new MockAuthorizableRequestContext.Builder().
 setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
-Collections.singletonList(newAction(READ, TOPIC, 
"foo_";
-assertEquals(Collections.singletonList(ALLOWED),
+singletonList(newAction(READ, TOPIC, "foo_";
+assertEquals(singletonList(ALLOWED),
 authorizer.authorize(new MockAuthorizableRequestContext.Builder().
 setPrincipal(new KafkaPrincipal(USER_TYPE, 
"fred")).build(),
-Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, 
"bar";
+singletonList(newAction(ALTER_CONFIGS, GROUP, "bar";
+}
+
+@Test
+public void testTopicAclWithOperationAll() throws Exception {
+StandardAuthorizer authorizer = new StandardAuthorizer();
+authorizer.configure(Collections.emptyMap());
+List acls = Arrays.asList(

Review comment:
   Can we add a test of DENY logic when ALL is in use? Can probably just 
add an additional test assert or something




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL

2022-02-25 Thread GitBox


cmccabe commented on a change in pull request #11806:
URL: https://github.com/apache/kafka/pull/11806#discussion_r815010107



##
File path: 
metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
##
@@ -250,14 +253,57 @@ public void testSimpleAuthorizations() throws Exception {
 withId(newBarAcl(ALTER_CONFIGS, ALLOW)));
 fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
 barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
-assertEquals(Collections.singletonList(ALLOWED),
+assertEquals(singletonList(ALLOWED),
 authorizer.authorize(new MockAuthorizableRequestContext.Builder().
 setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
-Collections.singletonList(newAction(READ, TOPIC, 
"foo_";
-assertEquals(Collections.singletonList(ALLOWED),
+singletonList(newAction(READ, TOPIC, "foo_";
+assertEquals(singletonList(ALLOWED),
 authorizer.authorize(new MockAuthorizableRequestContext.Builder().
 setPrincipal(new KafkaPrincipal(USER_TYPE, 
"fred")).build(),
-Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, 
"bar";
+singletonList(newAction(ALTER_CONFIGS, GROUP, "bar";
+}
+
+@Test
+public void testTopicAclWithOperationAll() throws Exception {
+StandardAuthorizer authorizer = new StandardAuthorizer();
+authorizer.configure(Collections.emptyMap());
+List acls = Arrays.asList(
+new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW),
+new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, 
ALLOW),
+new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW)
+);
+
+acls.forEach(acl -> {
+StandardAclWithId aclWithId = withId(acl);
+authorizer.addAcl(aclWithId.id(), aclWithId.acl());
+});
+
+assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), 
authorizer.authorize(
+newRequestContext("alice"),
+Arrays.asList(
+newAction(WRITE, TOPIC, "foo"),
+newAction(DESCRIBE_CONFIGS, TOPIC, "bar"),
+newAction(DESCRIBE, TOPIC, "baz";
+
+assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), 
authorizer.authorize(
+newRequestContext("bob"),
+Arrays.asList(
+newAction(WRITE, TOPIC, "foo"),
+newAction(READ, TOPIC, "bar"),
+newAction(DESCRIBE, TOPIC, "baz";

Review comment:
   wait, why does `DESCRIBE TOPIC baz` fail for bob given that we have:
   ```
   new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, 
ALLOW)
   ```
   ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL

2022-02-25 Thread GitBox


hachikuji commented on a change in pull request #11806:
URL: https://github.com/apache/kafka/pull/11806#discussion_r815008809



##
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##
@@ -391,11 +392,13 @@ static AuthorizationResult findResult(Action action,
 if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) 
return null;
 break;
 default:
-if (!action.operation().equals(acl.operation())) return 
null;
+if (acl.operation() != ALL && action.operation() != 
acl.operation()) {

Review comment:
   Heh. Yeah, makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL

2022-02-25 Thread GitBox


cmccabe commented on a change in pull request #11806:
URL: https://github.com/apache/kafka/pull/11806#discussion_r815008336



##
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##
@@ -391,11 +392,13 @@ static AuthorizationResult findResult(Action action,
 if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) 
return null;
 break;
 default:
-if (!action.operation().equals(acl.operation())) return 
null;
+if (acl.operation() != ALL && action.operation() != 
acl.operation()) {

Review comment:
   Hmm, would it be easier to just put
   ```
   if (acl.operation() != ALL) {
   }
   ```
   
   around the whole block?
   
   So
   ```
   if (acl.operation() != ALL) {
 if (acl.permissionType().equals(ALLOW)) {
   switch stuff ...
   } else {
   if (!action.operation().equals(acl.operation())) return null;
   }
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10591: Fix minor bugs in the existing documentation

2022-02-25 Thread GitBox


junrao commented on a change in pull request #10591:
URL: https://github.com/apache/kafka/pull/10591#discussion_r815000220



##
File path: docs/ops.html
##
@@ -78,7 +78,7 @@   
auto.leader.rebalance.enable=true
 You can also set this to false, but you will then need to manually restore 
leadership to the restored replicas by running the command:
- 
bin/kafka-preferred-replica-election.sh --bootstrap-server 
broker_host:port
+ 
bin/kafka-leader-election.sh --bootstrap-server broker_host:port

Review comment:
   Yes, we did move to the kafka-leader-election tool. Could we add the 
complete command line to do preferred leader election? I think we need to add 
--election-type preferred.

##
File path: docs/security.html
##
@@ -2092,12 +2092,12 @@ 7.6.3 Migrating the 
ZooKeeper ensemble
 It is also necessary to enable SASL and/or mTLS authentication on the 
ZooKeeper ensemble. To do it, we need to perform a rolling restart of the 
server and set a few properties. See above for mTLS information.  Please refer 
to the ZooKeeper documentation for more detail:
 
-https://zookeeper.apache.org/doc/r3.5.7/zookeeperProgrammers.html#sc_ZooKeeperAccessControl;>Apache
 ZooKeeper documentation
+https://zookeeper.apache.org/doc/r3.5.9/zookeeperProgrammers.html#sc_ZooKeeperAccessControl;>Apache
 ZooKeeper documentation

Review comment:
   The latest ZK version for Kafka is 3.6.3 now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

2022-02-25 Thread GitBox


mimaison commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814995987



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##
@@ -79,11 +82,21 @@ public int hashCode() {
 
 @Override
 public String toString() {
-final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-sb.append("className='").append(className).append('\'');
-sb.append(", type=").append(type);
-sb.append(", version='").append(version).append('\'');
-sb.append('}');
-return sb.toString();
+return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+", type=" + type.toString() +
+", version='" + version + '\'' +
+'}';
+}
+
+public static final class NoVersionFilter {
+@Override
+public boolean equals(Object obj) {
+return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();
+}

Review comment:
   Yes that's a good idea

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##
@@ -79,11 +82,21 @@ public int hashCode() {
 
 @Override
 public String toString() {
-final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-sb.append("className='").append(className).append('\'');
-sb.append(", type=").append(type);
-sb.append(", version='").append(version).append('\'');
-sb.append('}');
-return sb.toString();
+return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+", type=" + type.toString() +
+", version='" + version + '\'' +
+'}';
+}
+
+public static final class NoVersionFilter {
+@Override
+public boolean equals(Object obj) {
+return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();
+}

Review comment:
   Yes that's a good idea, done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #11806: KAFKA-13697; KRaft authorizer should support AclOperation.ALL

2022-02-25 Thread GitBox


hachikuji opened a new pull request #11806:
URL: https://github.com/apache/kafka/pull/11806


   AclOperation.ALL implies all other operation types, but we are not checking 
for it in StandardAuthorizer. The patch fixes the issue and adds some test 
cases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11796: Kip 770

2022-02-25 Thread GitBox


guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1051079006


   Hello @vamossagar12 I checked out your branch, and run the 
`streams:compileTestJava` on my local machine it it also fails with:
   
   ```
   > Task :streams:compileTestJava
   
/Users/guozhang/Workspace/github/guozhangwang/kafka-work/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java:95:
 warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
^
   error: warnings found and -Werror specified
   1 error
   1 warning
   
   > Task :streams:compileTestJava FAILED
   ```
   
   I guess your branch was not rebased on top of `trunk` and maybe that's why 
you did not see the failure?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13697) KRaft authorizer should handle AclOperation.ALL

2022-02-25 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13697:
---

 Summary: KRaft authorizer should handle AclOperation.ALL
 Key: KAFKA-13697
 URL: https://issues.apache.org/jira/browse/KAFKA-13697
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


AclOperation.ALL implies other permissions, but we are not currently checking 
for it in StandardAuthorizer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

2022-02-25 Thread GitBox


jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051077042


   @hachikuji I pushed a commit so that both controller implementations only 
persist the leader recovery state if the cluster supports leader recovery state 
(IBP is greater than 3.2). I am currently working on the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2022-02-25 Thread GitBox


junrao commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r814981129



##
File path: core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
##
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, 
RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances.
+ *  - receives any leader and follower replica events and partition stop 
events and act on them
+ *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig
+ * @param brokerId
+ * @param logDir
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+   brokerId: Int,
+   logDir: String) extends Logging with Closeable with 
KafkaMetricsGroup {
+
+  // topic ids received on leadership changes
+  private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new 
ConcurrentHashMap[TopicPartition, Uuid]()
+
+  private val remoteLogStorageManager: RemoteStorageManager = 
createRemoteStorageManager()
+  private val remoteLogMetadataManager: RemoteLogMetadataManager = 
createRemoteLogMetadataManager()
+
+  private val indexCache = new RemoteIndexCache(remoteStorageManager = 
remoteLogStorageManager, logDir = logDir)
+
+  private var closed = false
+
+  private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+  classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+
.getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+}
+
+AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+  private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+  override def run(): RemoteStorageManager = {
+  if (classPath != null && classPath.trim.nonEmpty) {
+val classLoader = new ChildFirstClassLoader(classPath, 
this.getClass.getClassLoader)
+val delegate = createDelegate(classLoader)
+new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+  } else {
+createDelegate(this.getClass.getClassLoader)
+  }
+  }
+})
+  }
+
+  private def configureRSM(): Unit = {
+val rsmProps = new util.HashMap[String, Any]()
+rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => 
rsmProps.put(k, v) }
+rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+remoteLogStorageManager.configure(rsmProps)
+  }
+
+  private[remote] def createRemoteLogMetadataManager(): 
RemoteLogMetadataManager = {
+def createDelegate(classLoader: ClassLoader) = {
+  classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+.getDeclaredConstructor()
+.newInstance()
+.asInstanceOf[RemoteLogMetadataManager]
+}
+
+AccessController.doPrivileged(new 
PrivilegedAction[RemoteLogMetadataManager] {
+  private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+  override def run(): RemoteLogMetadataManager = {
+if (classPath != null && classPath.trim.nonEmpty) {
+  val 

[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

2022-02-25 Thread GitBox


jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051066928


   > For this 
[comment](https://github.com/apache/kafka/pull/11733#discussion_r812044247):
   > 
   > > If the version is `0` then this is guaranteed to be the default value 
`0` so the serialization will succeed. This is true because we only write these 
values in the response when the operation success. If the operation fails then 
we skip writing these values and instead just write the error code.
   > 
   > This is a bit subjective, so feel free to disregard, but I do feel like 
some of the implicit assumptions might be causing some unnecessary obscurity. 
This is one case where a version check might actually be clearer and prevent 
the need for the extra comment.
   > 
   > A second case is implicitly setting RECOVERED in `PendingPartitionChange`. 
I had a comment about this 
[here](https://github.com/apache/kafka/pull/11733/files#r805005385), which 
might have been missed. This is fine at the moment because the current patch 
does not do any actual recovery operation, but I think we should reconsider it 
when we do. Otherwise I do think it's easy to overlook the implication when 
making other partition state changes.
   
   Okay. As you suggested, I marked the LeaderRecoveryState field for the 
AlterParititon response as ignorable. This has the same behavior we want but 
implemented in the serialization layer instead of in the controller logic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

2022-02-25 Thread GitBox


jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051064227


   > Hmm, suppose we have a partition with replicas=(0) and isr=(0), but 0 is 
offline. The user chooses to reassign to node 1 and perform an unclean election 
to make it the leader. That would allow the reassignment to complete, but the 
partition state would be RECOVERING, right? How do we get out of that state?
   
   I think that is correct. I think what we can do here is make sure that the 
leader sends an AlterPartition request when it changes from RECOVERING to 
RECOVERED. I didn't implement it in this PR since this implementation is a noop 
for the recovering state. Do you mind if I implement this in a future PR? I 
filed: https://issues.apache.org/jira/browse/KAFKA-13696


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13696) Topic partition leader should always send AlterPartition when transitioning from RECOVRING TO RECOVERD

2022-02-25 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13696:
--

 Summary: Topic partition leader should always send AlterPartition 
when transitioning from RECOVRING TO RECOVERD
 Key: KAFKA-13696
 URL: https://issues.apache.org/jira/browse/KAFKA-13696
 Project: Kafka
  Issue Type: Task
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13681) Sink event duplicates for partition-stuck stream application

2022-02-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498236#comment-17498236
 ] 

Guozhang Wang commented on KAFKA-13681:
---

[~DrozD_0] We have not decided when it would be applied as a general 
optimization yet, but since we usually only do backports for any non-critical 
changes for the past year's releases, it's less likely we are going to have new 
2.x bug-fix releases.

> Sink event duplicates for partition-stuck stream application
> 
>
> Key: KAFKA-13681
> URL: https://issues.apache.org/jira/browse/KAFKA-13681
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Mikhail Dubrovin
>Priority: Major
> Attachments: fail_topology.txt
>
>
> Hello,
> We found the following unpredictable behavior of Kafka streams:
> {code:java}
> public void buildStreams(final BuilderHelper builder) {        
> KTable table = builder.table();        
> KTable> workflowTable = 
> workflowTable(builder);        
> table
>                 .mapValues(value -> mappers.mainDTO(value))
>                 .leftJoin(workflowTable, mappers::joinWorkflows)
>                 .toStream()
>                 .map((key, value) -> KeyValue.pair(
>                         AggregateId.newBuilder().setId(value.getId()).build(),
>                         mappers.aggregateDTO(value)))
>                 .peek((k, v) -> logSinkRecord(v))
>                 .filter((id, dto) -> !isReprocessing)
>                 .to(...);
>     }    
> private static KTable> 
> workflowTable(BuilderHelper builderHelper) {
>             return builderHelper.workflowTable()
>                     .groupBy((id, workflow) -> KeyValue.pair(
>                             
> TableId.newBuilder().setId(workflow.getTableId()).build(),
>                             mappers.mapWorkflow(workflow)),
>                             Grouped.with(...))
>                     .aggregate(ArrayList::new, (key, value, agg) -> {
>                         agg.add(value);
>                         return agg;
>                     }, (key, value, agg) -> {
>                         agg.remove(value);
>                         return agg;
>                     }, Materialized.with(...));
>         } {code}
> it is a small part of our topology but it shows the error flow.
> *Data structure:*
> We have two many-partition topics: entity and workflow. Every topic is 
> represented as KTable.
> *Data error that causes application shutdown:*
> Our final event(join the entity and workflow ktables) expects a not-null 
> field in the entity but for some reason, it comes for one event. The whole 
> aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ 
> method 
> We have a health check which restarts the aggregator if it fails.
> When incorrect data comes to one partition, the partition processing is stuck 
> but other partitions are processed.
> It causes that at every restart, _workflowTable_ topology repeats 
> .aggregate() add/remove flows and puts new List into the repartition topic. 
> But offsets are not moved for processed partitions due to the aggregator's 
> shutdown.
> _This behavior generates/sinks a lot of final entity duplicates at every 
> restart because the flow is successful for data from a not-corrupted 
> partition but offsets are not moved for them._ 
> And it also causes troubles if @EqualsAndHashCode is defined to use all 
> fields to compare. At every restart, the topology tries to remove the old 
> value(not existing after the first run) and adds a new value at the end of 
> the list. The list grows after each restart(contains the same - new value 
> values).
> I also attached the topology description. To visualize: 
> [https://zz85.github.io/kafka-streams-viz/]
> *Current workaround:*
> To redefine @EqualsAndHashCode to use entities' ids only.
> *Not solved issue:*
> Sink events duplication at every restart.
> Thank you in advance!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13694) Some InvalidRecordException messages are thrown away

2022-02-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498234#comment-17498234
 ] 

Guozhang Wang commented on KAFKA-13694:
---

Hello [~RivenSun] thanks for the report. If I understand you correctly, we are 
trying to solve two issues here:

1) Let the broker to print the meaningful error message when validation fails.
2) Let the broker to send the error message all the way back to client, so that 
client can also print the meaningful message.

For 1) I think it's definitely a win, and for that purpose I think we can 
consider just replace the `processFailedRecord(topicPartition, 
rve.invalidException)` call with `processFailedRecord(topicPartition, rve)`, 
which then should include both the invalidException as well as the recordErrors 
in the error message.

For 2), what I saw is that today we indeed put the `recordErrors` as part of 
the `PartitionResponse` and hence encoded back to the client, hence would be 
triggered as part of the `producer.Callback`. So if user does implement a 
Callback it should be observed; or did I missed something?

> Some InvalidRecordException messages are thrown away
> 
>
> Key: KAFKA-13694
> URL: https://issues.apache.org/jira/browse/KAFKA-13694
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> 1.Example
> Topic level config:"cleanup.policy":"compact" 
> But when the producer sends the message, the ProducerRecord does not specify 
> the key.
>  
> producer.log
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One 
> or more records have been rejected {code}
>  
>  
> server.log
> {code:java}
> [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: One or more records have been 
> rejected {code}
> Through the logs of the producer and server, we do not know the reason for 
> the failure of sending, only that the message was rejected by the server.
> You can compare the RecordTooLargeException testCase, we can clearly know the 
> reason for the failure from the producer, and the server will not print the 
> log (the reason will be explained later)
> producer_message_too_large.log :
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept.
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept. {code}
> 2.RootCause
> ReplicaManager#appendToLocalLog(...) ->
> Partition#appendRecordsToLeader(...) ->
> UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->
> LogValidator#validateMessagesAndAssignOffsets(...) 
> 1) Analyze the validateMessagesAndAssignOffsets method,
> In the LogValidator#validateRecord method, validateKey and validateTimestamp 
> are called, and the error information of all messages is obtained: 
> Seq[ApiRecordError];
> In the subsequent processRecordErrors(recordErrors) method, currently only 
> special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR 
> returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the 
> code will run to
> {code:java}
> else {
>   throw new RecordValidationException(new InvalidRecordException(
> "One or more records have been rejected"), errors)
> }{code}
> In fact, the *errors* variable here contains the specific information of each 
> recordError, but we did not put the errors information into the message of 
> InvalidRecordException.
> 2).The exception thrown by processRecordErrors will be caught by 
> ReplicaManager#appendToLocalLog(...), we continue to analyze the 
> `catchException code` of appendToLocalLog.
> Here, we can know the RecordTooLargeException, why the server does not print 
> the log.
> Under case rve: RecordValidationException,
> The server prints the log: processFailedRecord method, 
> and sends a response to the client: LogAppendResult method
> In these two methods, we can find that we only use rve.invalidException,
> For rve.recordErrors, the server neither prints it nor returns it to the 
> client.
> 3.Solution
> Two solutions, I prefer the second
> 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns 
> Errors.INVALID_RECORD_WITHOUT_KEY,
> In the processRecordErrors method, also do special processing for 
> Errors.INVALID_RECORD_WITHOUT_KEY
> 

[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

2022-02-25 Thread GitBox


jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814964024



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2781,8 +2812,9 @@ case object IsrChangeNotification extends ControllerEvent 
{
   override def preempt(): Unit = {}
 }
 
-case class AlterIsrReceived(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
-callback: AlterIsrCallback) extends 
ControllerEvent {
+case class AlterPartitionReceived(
+  brokerId: Int, brokerEpoch: Long, partitionssToAlter: Map[TopicPartition, 
LeaderAndIsr], callback: AlterPartitionCallback

Review comment:
   Fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

2022-02-25 Thread GitBox


jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814963257



##
File path: core/src/main/scala/kafka/controller/Election.scala
##
@@ -53,17 +60,17 @@ object Election {
* Elect leaders for new or offline partitions.
*
* @param controllerContext Context with the current state of the cluster
-   * @param partitionsWithUncleanLeaderElectionState A sequence of tuples 
representing the partitions
+   * @param partitionsWithUncleanLeaderLeaderRecoveryState A sequence of 
tuples representing the partitions

Review comment:
   Yes. Fixed. I think I did a search and replace at some point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

2022-02-25 Thread GitBox


jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814961954



##
File path: core/src/main/scala/kafka/controller/Election.scala
##
@@ -40,7 +40,14 @@ object Election {
 val newLeaderAndIsrOpt = leaderOpt.map { leader =>
   val newIsr = if (isr.contains(leader)) isr.filter(replica => 
controllerContext.isReplicaOnline(replica, partition))
   else List(leader)
-  leaderAndIsr.newLeaderAndIsr(leader, newIsr)
+
+  if (!isr.contains(leader)) {
+// The new leader is not in the old ISR so mark the partition a 
RECOVERING
+leaderAndIsr.newRecoveringLeaderAndIsr(leader, newIsr)
+  } else {
+// Elect a new leader but keep the previous leader recovery state

Review comment:
   Yes. The case that I had in mind is:
   1. Leader is elected using unclean leader election. E.g. leader: 1, 
recoveryState: RECOVERING
   2. Leader never sends AlterPartition and goes offline. E.g. leader: -1, 
recoveryState: RECOVERING
   3. Only ISR member (id 1) comes back online. E.g. leader:1, recoveryState: 
RECOVERING




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

2022-02-25 Thread GitBox


C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814955840



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##
@@ -79,11 +82,21 @@ public int hashCode() {
 
 @Override
 public String toString() {
-final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-sb.append("className='").append(className).append('\'');
-sb.append(", type=").append(type);
-sb.append(", version='").append(version).append('\'');
-sb.append('}');
-return sb.toString();
+return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+", type=" + type.toString() +
+", version='" + version + '\'' +
+'}';
+}
+
+public static final class NoVersionFilter {
+@Override
+public boolean equals(Object obj) {
+return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();
+}

Review comment:
   Ah yeah, compiler warning about overriding `equals` but not `hashCode`. 
Think we should leave a comment about that just so others don't wonder the same 
thing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11705: KAFKA-9847: add config to set default store type

2022-02-25 Thread GitBox


guozhangwang commented on pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#issuecomment-1051037462


   I made another pass and it LGTM. @ableegoldman would you like to make 
another pass?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11805: KAFKA-13692: include metadata wait time in total blocked time

2022-02-25 Thread GitBox


guozhangwang commented on a change in pull request #11805:
URL: https://github.com/apache/kafka/pull/11805#discussion_r814937598



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -223,7 +224,9 @@ private double totalBlockedTime(final Producer 
producer) {
 + getMetricValue(producer.metrics(), "txn-begin-time-ns-total")
 + getMetricValue(producer.metrics(), 
"txn-send-offsets-time-ns-total")
 + getMetricValue(producer.metrics(), "txn-commit-time-ns-total")
-+ getMetricValue(producer.metrics(), "txn-abort-time-ns-total");
++ getMetricValue(producer.metrics(), "txn-abort-time-ns-total")

Review comment:
   This is a meta question: since ns and ms measurement mechanisms differ, 
if we simply sum them as `ns + ms * 10^6` it may not be accurate. Probably 
worth checking in if the aggregated total blocked time does make sense.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##
@@ -71,6 +74,10 @@ public KafkaProducerMetrics(Metrics metrics) {
 TXN_ABORT,
 "Total time producer has spent in abortTransaction in nanoseconds."
 );
+metadataWaitSensor = newLatencySensor(
+METADATA_WAIT,
+"Total time producer has spent waiting on metadata in "

Review comment:
   This line seems not completed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

2022-02-25 Thread GitBox


mimaison commented on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1051008938


   Thanks @C0urante for the review, all good suggestions. I believe I've 
addressed them all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

2022-02-25 Thread GitBox


mimaison commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814917568



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##
@@ -79,11 +82,21 @@ public int hashCode() {
 
 @Override
 public String toString() {
-final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-sb.append("className='").append(className).append('\'');
-sb.append(", type=").append(type);
-sb.append(", version='").append(version).append('\'');
-sb.append('}');
-return sb.toString();
+return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+", type=" + type.toString() +
+", version='" + version + '\'' +
+'}';
+}
+
+public static final class NoVersionFilter {
+@Override
+public boolean equals(Object obj) {
+return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();
+}

Review comment:
   Without this method, the compilation fails. To be honest, I'm not sure 
if there's a way to avoid it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #11777: KAFKA-10000: Add producer fencing API to admin client (KIP-618)

2022-02-25 Thread GitBox


C0urante commented on a change in pull request #11777:
URL: https://github.com/apache/kafka/pull/11777#discussion_r814866192



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class FenceProducersHandler extends 
AdminApiHandler.Unbatched {
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public FenceProducersHandler(
+LogContext logContext
+) {
+this.log = logContext.logger(DescribeTransactionsHandler.class);
+this.lookupStrategy = new 
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
logContext);
+}
+
+public static AdminApiFuture.SimpleAdminApiFuture newFuture(
+Collection transactionalIds
+) {
+return AdminApiFuture.forKeys(buildKeySet(transactionalIds));
+}
+
+private static Set buildKeySet(Collection 
transactionalIds) {
+return transactionalIds.stream()
+.map(CoordinatorKey::byTransactionalId)
+.collect(Collectors.toSet());
+}
+
+@Override
+public String apiName() {
+return "fenceProducer";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return lookupStrategy;
+}
+
+@Override
+InitProducerIdRequest.Builder buildSingleRequest(int brokerId, 
CoordinatorKey key) {
+if (key.type != FindCoordinatorRequest.CoordinatorType.TRANSACTION) {
+throw new IllegalArgumentException("Invalid group coordinator key 
" + key +
+" when building `InitProducerId` request");
+}
+InitProducerIdRequestData data = new InitProducerIdRequestData()
+.setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
+.setProducerId(ProducerIdAndEpoch.NONE.producerId)
+.setTransactionalId(key.idValue)
+// Set transaction timeout to 1 since it's only being 
initialized to fence out older producers with the same transactional ID,
+// and shouldn't be used for any actual record writes
+.setTransactionTimeoutMs(1);
+return new InitProducerIdRequest.Builder(data);
+}
+
+@Override
+public ApiResult handleSingleResponse(
+Node broker,
+CoordinatorKey key,
+AbstractResponse abstractResponse
+) {
+InitProducerIdResponse response = (InitProducerIdResponse) 
abstractResponse;
+
+Errors error = Errors.forCode(response.data().errorCode());
+if (error != Errors.NONE) {
+return handleError(key, error);
+}
+
+Map completed = 
Collections.singletonMap(key, new ProducerIdAndEpoch(
+response.data().producerId(),
+response.data().producerEpoch()
+));
+
+return new ApiResult<>(completed, Collections.emptyMap(), 
Collections.emptyList());
+}
+
+private ApiResult 
handleError(CoordinatorKey transactionalIdKey, Errors error) {
+switch (error) {

Review comment:
   Ah, good point about extensibility. I'd personally opt to just leave a 
comment (and in 

[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others

2022-02-25 Thread GitBox


viktorsomogyi commented on pull request #10738:
URL: https://github.com/apache/kafka/pull/10738#issuecomment-1050920500


   Thank you Manikumar!
I'll rebase the change then, will ping you when I'm done (since it is late 
Friday here it might be on Monday).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-02-25 Thread GitBox


viktorsomogyi commented on pull request #11748:
URL: https://github.com/apache/kafka/pull/11748#issuecomment-1050913057


   @mimaison, just saw your comment on KAFKA-13659. I can take a look at this 
next week. @urbandan and @dorwi already has a fix for KAFKA-13659 (and thus 
this) in our distro that we wanted to contribute back. If they have time they 
may also review to your solution or share their approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13695) Low-traffic topics don't roll (and therefore compact) nor delete tombstones

2022-02-25 Thread Marc (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marc updated KAFKA-13695:
-
Attachment: DeletionConceptTestbed

> Low-traffic topics don't roll (and therefore compact) nor delete tombstones
> ---
>
> Key: KAFKA-13695
> URL: https://issues.apache.org/jira/browse/KAFKA-13695
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.5.0, 3.1.0
>Reporter: Marc
>Priority: Major
>
> I set up a testbed with some partitions and inspected carefully the behaviour 
> on the Kafka /data folder.
> It looks like when active segment qualifies for rolling it is not effectively 
> being closed until a new record arrives. Thus, it cannot be compacted in a 
> timely and deterministic manner by means of max.compaction.lag.ms, for 
> instance.
> Again the problem occurs when setting up delete.retention.ms. Once compaction 
> happened and the canonical latest state of a key is a unique tombstone on the 
> compacted tail, we must wait for an arbitrary record arrival in order for 
> deletion to be triggered, just as before.
> I expected log.preallocate property to create a new segment file once we 
> marked the current one for rolling and we still got no new records, hoping it 
> can be rolled at last, but it seems it has nothing to do. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13695) Low-traffic topics don't roll (and therefore compact) nor delete tombstones

2022-02-25 Thread Marc (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marc updated KAFKA-13695:
-
Attachment: (was: DeletionConceptTestbed)

> Low-traffic topics don't roll (and therefore compact) nor delete tombstones
> ---
>
> Key: KAFKA-13695
> URL: https://issues.apache.org/jira/browse/KAFKA-13695
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.5.0, 3.1.0
>Reporter: Marc
>Priority: Major
>
> I set up a testbed with some partitions and inspected carefully the behaviour 
> on the Kafka /data folder.
> It looks like when active segment qualifies for rolling it is not effectively 
> being closed until a new record arrives. Thus, it cannot be compacted in a 
> timely and deterministic manner by means of max.compaction.lag.ms, for 
> instance.
> Again the problem occurs when setting up delete.retention.ms. Once compaction 
> happened and the canonical latest state of a key is a unique tombstone on the 
> compacted tail, we must wait for an arbitrary record arrival in order for 
> deletion to be triggered, just as before.
> I expected log.preallocate property to create a new segment file once we 
> marked the current one for rolling and we still got no new records, hoping it 
> can be rolled at last, but it seems it has nothing to do. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] omkreddy commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others

2022-02-25 Thread GitBox


omkreddy commented on pull request #10738:
URL: https://github.com/apache/kafka/pull/10738#issuecomment-1050897685


   Sorry for missing this PR. I will help to review and merge the PR . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others

2022-02-25 Thread GitBox


viktorsomogyi commented on pull request #10738:
URL: https://github.com/apache/kafka/pull/10738#issuecomment-1050877629


   @sachmo99 I pinged @rajinisivaram and @omkreddy in email, hopefully they 
respond and I'd be happy to rebase this change on top of trunk and push it in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap

2022-02-25 Thread GitBox


viktorsomogyi edited a comment on pull request #9519:
URL: https://github.com/apache/kafka/pull/9519#issuecomment-1050869998


   @lbradstreet @ijuma @junrao I'd like to revive this. This is a bit old I 
know, but I'd like to close out dangling open issues. Is this needed in the 
Apache Kafka distribution? (We've been using Murmur3 for the Cloudera distro 
for 2 years now, didn't have any problems with it since then so it's somewhat 
time tested.)
   If you think this is needed, then please reply within a week and I resolve 
the conflicts, otherwise I close this PR to keep my own list clear. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap

2022-02-25 Thread GitBox


viktorsomogyi commented on pull request #9519:
URL: https://github.com/apache/kafka/pull/9519#issuecomment-1050869998


   @lbradstreet @ijuma @junrao I'd like to revive this. This is a bit old I 
know, but I'd like to close out dangling open issues. Is this needed in the 
Apache Kafka distribution? (We've been using Murmur3 for the Cloudera distro 
for 2 years now, didn't have any problems with it since then so it's somewhat 
time tested.)
   If you think this is needed, then please reply within a week, otherwise I 
close this PR to keep my own list clear. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13678) 2nd punctuation using STREAM_TIME does not respect scheduled interval

2022-02-25 Thread Matteo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498061#comment-17498061
 ] 

Matteo edited comment on KAFKA-13678 at 2/25/22, 1:40 PM:
--

hi guys, I'm Matteo and I'm working in the same team of Lorenzo.

I can't get your point about determinism. If the punctuator set the next 
ring-time with respect to the current event-timestamp, the determinism would 
perfectly hold even in case of reprocessing.

Let's examine a practical use case: I want to monitor 2 sensors. The sensors 
are designed to send an "ok" signal and, sometimes, an "error" signal, when 
things are wrong. Let's imagine we want to be notified when a sensor sends an 
"error" signal and no "ok" signals in the next 5 minutes. Something like "if 
you're in an error state and the situation doesn't change for 5 minutes, then I 
want to take a particular action". On the other hand, if we receive an error 
signal but an "ok" signal arrives in the next 5 minutes, then we don't care 
about the error.
Now, let's imagine this situation: we receive an "error" signal from sensor 1, 
at event time t0. After a while we receive an "error signal" at event time t1, 
this time from sensor 2. We expect the behavior to be woken up at time t0+5min 
and at time t1+5min to take the appropriate actions (as soon as we do not 
receive any "ok" signal in the meanwhile). First, let's clarify that the time 
reference has necessarily to be the event time: indeed, if we receive an 
"error" event and the ingestion stops for 10 minutes (for example because of a 
network problem) we don't want to trigger any action as, actually, the sensor 
could have sent an "ok" signal in the meanwhile but we weren't able to consume 
it yet.
With the current punctuator semantic, this use case is impossible to implement.

On the other hand, making a punctuator to set the "wake up" trigger with 
respect to the current event timestamp would do the work. Two notes here:
1) the "current" timestamp event is of course a "best effort" approach as the 
granularity (and, so, the precision) of the time measurement depends on the 
granularity of the incoming events.
2) the semantic of a component like that would remain the same no matter if you 
are in "real time" or "reprocess" situation, preserving the determinism 
(please, give me a counter-example that could explain why the determinism 
wouldn't hold).


was (Author: JIRAUSER285799):
hi guys, I'm Matteo and I'm working in the same team of Lorenzo.

I can't get your point about determinism. If the punctuator set the next 
ring-time with respect to the current event-timestamp, the determinism would 
perfectly hold even in case of reprocessing.

Let's examine a practical use case: I want to monitor 2 sensors. The sensors 
are designed to send an "ok" signal and, sometimes, an "error" signal, when 
things are wrong. Let's imagine we want to be notified when a sensor sends an 
"error" signal and no "ok" signals in the next 5 minutes. Something like "if 
you're in an error state and the situation doesn't change for 5 minutes, then I 
want to take a particular action". On the other hand, if we receive an error 
signal but an "ok" signal arrives in the next 5 minutes, then we don't care 
about the error.
Now, let's imagine this situation: we receive an "error" signal from sensor 1, 
at event time t0. After a while we receive an "error signal" at event time t1, 
this time from sensor 2. We expect the behavior to be waked up at time t0+5min 
and at time t1+5min to take the appropriate actions (as soon as we do not 
receive any "ok" signal in the meanwhile). First, let's clarify that the time 
reference has necessarily to be the event time: indeed, if we receive an 
"error" event and the ingestion stops for 10 minutes (for example because of a 
network problem) we don't want to trigger any action as, actually, the sensor 
could have sent an "ok" signal in the meanwhile but we weren't able to consume 
it yet.
With the current punctuator semantic, this use case is impossible to implement.

On the other hand, making a punctuator to set the "wake up" trigger with 
respect to the current event timestamp would do the work. Two notes here:
1) the "current" timestamp event is of course a "best effort" approach as the 
granularity (and, so, the precision) of the time measurement depends on the 
granularity of the incoming events.
2) the semantic of a component like that would remain the same no matter if you 
are in "real time" or "reprocess" situation, preserving the determinism 
(please, give me a counter-example that could explain why the determinism 
wouldn't hold).

> 2nd punctuation using STREAM_TIME does not respect scheduled interval
> -
>
> Key: KAFKA-13678
> URL: https://issues.apache.org/jira/browse/KAFKA-13678
> 

[GitHub] [kafka] viktorsomogyi closed pull request #11491: KAFKA-13442: REST API endpoint for fetching a connector's config def

2022-02-25 Thread GitBox


viktorsomogyi closed pull request #11491:
URL: https://github.com/apache/kafka/pull/11491


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11788: KAFKA-13673: disable idempotence when config conflicts

2022-02-25 Thread GitBox


ijuma commented on a change in pull request #11788:
URL: https://github.com/apache/kafka/pull/11788#discussion_r814763487



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -461,27 +467,53 @@ private void 
postProcessAndValidateIdempotenceConfigs(final Map
 final Map originalConfigs = this.originals();
 final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
 configs.put(ACKS_CONFIG, acksStr);
-
-// For idempotence producers, values for `RETRIES_CONFIG` and 
`ACKS_CONFIG` need validation
-if (idempotenceEnabled()) {
-boolean userConfiguredRetries = 
originalConfigs.containsKey(RETRIES_CONFIG);
-if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
-throw new ConfigException("Must set " + 
ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent 
producer.");
+final boolean userConfiguredIdempotence = 
this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+boolean idempotenceEnabled = 
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+boolean shouldDisableIdempotence = false;
+
+// For idempotence producers, values for `retries` and `acks` and 
`max.in.flight.requests.per.connection` need validation
+if (idempotenceEnabled) {
+final int retries = this.getInt(RETRIES_CONFIG);
+if (retries == 0) {
+if (userConfiguredIdempotence) {
+throw new ConfigException("Must set " + RETRIES_CONFIG + " 
to non-zero when using the idempotent producer.");
+}
+log.info("Idempotence will be disabled because {} is set to 
0.", RETRIES_CONFIG, retries);
+shouldDisableIdempotence = true;
 }
 
-boolean userConfiguredAcks = 
originalConfigs.containsKey(ACKS_CONFIG);
 final short acks = Short.valueOf(acksStr);
-if (userConfiguredAcks && acks != (short) -1) {
-throw new ConfigException("Must set " + ACKS_CONFIG + " to all 
in order to use the idempotent " +
+if (acks != (short) -1) {
+if (userConfiguredIdempotence) {
+throw new ConfigException("Must set " + ACKS_CONFIG + " to 
all in order to use the idempotent " +
 "producer. Otherwise we cannot guarantee 
idempotence.");
+}
+log.info("Idempotence will be disabled because {} is set to 
{}, not set to 'all'.", ACKS_CONFIG, acks);
+shouldDisableIdempotence = true;
 }
 
-boolean userConfiguredInflightRequests = 
originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-if (userConfiguredInflightRequests && 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < 
this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
-throw new ConfigException("Must set " + 
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
+final int inFlightConnection = 
this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < 
inFlightConnection) {
+if (userConfiguredIdempotence) {
+throw new ConfigException("Must set " + 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
 " to use the idempotent producer.");
+}
+log.warn("Idempotence will be disabled because {} is set to 
{}, which is greater than 5. " +
+"Please note that in v4.0.0 and onward, this will become 
an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection);
+shouldDisableIdempotence = true;
 }
 }
+
+if (shouldDisableIdempotence) {
+configs.put(ENABLE_IDEMPOTENCE_CONFIG, false);
+}
+
+// validate `transaction.id` after validating idempotence dependant 
configs because `enable.idempotence` config might be overridden
+idempotenceEnabled = idempotenceEnabled && !shouldDisableIdempotence;

Review comment:
   Maybe we can set this to `false` in the `shouldDisableIdempotence` 
block? Seems a bit more natural.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13695) Low-traffic topics don't roll (and therefore compact) nor delete tombstones

2022-02-25 Thread Marc (Jira)
Marc created KAFKA-13695:


 Summary: Low-traffic topics don't roll (and therefore compact) nor 
delete tombstones
 Key: KAFKA-13695
 URL: https://issues.apache.org/jira/browse/KAFKA-13695
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.1.0, 2.5.0
Reporter: Marc


I set up a testbed with some partitions and inspected carefully the behaviour 
on the Kafka /data folder.

It looks like when active segment qualifies for rolling it is not effectively 
being closed until a new record arrives. Thus, it cannot be compacted in a 
timely and deterministic manner by means of max.compaction.lag.ms, for instance.

Again the problem occurs when setting up delete.retention.ms. Once compaction 
happened and the canonical latest state of a key is a unique tombstone on the 
compacted tail, we must wait for an arbitrary record arrival in order for 
deletion to be triggered, just as before.


I expected log.preallocate property to create a new segment file once we marked 
the current one for rolling and we still got no new records, hoping it can be 
rolled at last, but it seems it has nothing to do. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11705: KAFKA-9847: add config to set default store type

2022-02-25 Thread GitBox


showuon commented on a change in pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#discussion_r814753767



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -81,14 +81,19 @@ public StreamsBuilder() {
 internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
 }
 
-protected StreamsBuilder(final TopologyConfig topologyConfigs) {
+/**
+ * Create a {@code StreamsBuilder} instance.
+ *
+ * @param topologyConfigsthe streams configs that apply at the 
topology level. Please refer to {@link TopologyConfig} for more detail
+ */
+public StreamsBuilder(final TopologyConfig topologyConfigs) {
 topology = getNewTopology(topologyConfigs);
 internalTopologyBuilder = topology.internalTopologyBuilder;
 internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
 }
 
 protected Topology getNewTopology(final TopologyConfig topologyConfigs) {

Review comment:
   Yes, agree with you. This should be part of KIP-591 to make the 
`TopologyConfig` as public API. Moved out of `internals.namedtopology` package 
now. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13693) Config in memory got inconsistency value

2022-02-25 Thread Augusto Hack (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498090#comment-17498090
 ] 

Augusto Hack commented on KAFKA-13693:
--

[~showuon] here are some additional details:
 * Kafka version: 2.8.1
 * partitions: 3
 * policy: compact
 * the znode for the topic was never modified (version was 0, and ctime and 
mtime equal)
 * brokers existed before the topic was created (checked via timestamp of the 
znodes in `/brokers/ids`)

Observed bug:
 * The LogConfig object of one of the partitions in the leader had the flag 
delete set to true, every other LogConfig had the correct value (true for 
compact, false for delete). Checked with a memory dump and querying using 
Memory Analyzer (MAT)

> Config in memory got inconsistency value
> 
>
> Key: KAFKA-13693
> URL: https://issues.apache.org/jira/browse/KAFKA-13693
> Project: Kafka
>  Issue Type: Bug
>Reporter: Aiqin Zhang
>Priority: Minor
> Attachments: Screenshot from 2022-02-25 14-34-36.png
>
>
> We have a kafka cluster with a topic which configured as clean up policy = 
> compact.
> But found after timeout timer, the data got deleted on one of the partitions.
>  
> Further investigation shows that in memory the value is actually delete which 
> is inconsistent.
>  
> Topic was created via Terraform in bulk, around 250 together and returned 
> with success.
>  
> We did the broker restart to correct the setting in memory.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13678) 2nd punctuation using STREAM_TIME does not respect scheduled interval

2022-02-25 Thread Matteo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498061#comment-17498061
 ] 

Matteo edited comment on KAFKA-13678 at 2/25/22, 11:07 AM:
---

hi guys, I'm Matteo and I'm working in the same team of Lorenzo.

I can't get your point about determinism. If the punctuator set the next 
ring-time with respect to the current event-timestamp, the determinism would 
perfectly hold even in case of reprocessing.

Let's examine a practical use case: I want to monitor 2 sensors. The sensors 
are designed to send an "ok" signal and, sometimes, an "error" signal, when 
things are wrong. Let's imagine we want to be notified when a sensor sends an 
"error" signal and no "ok" signals in the next 5 minutes. Something like "if 
you're in an error state and the situation doesn't change for 5 minutes, then I 
want to take a particular action". On the other hand, if we receive an error 
signal but an "ok" signal arrives in the next 5 minutes, then we don't care 
about the error.
Now, let's imagine this situation: we receive an "error" signal from sensor 1, 
at event time t0. After a while we receive an "error signal" at event time t1, 
this time from sensor 2. We expect the behavior to be waked up at time t0+5min 
and at time t1+5min to take the appropriate actions (as soon as we do not 
receive any "ok" signal in the meanwhile). First, let's clarify that the time 
reference has necessarily to be the event time: indeed, if we receive an 
"error" event and the ingestion stops for 10 minutes (for example because of a 
network problem) we don't want to trigger any action as, actually, the sensor 
could have sent an "ok" signal in the meanwhile but we weren't able to consume 
it yet.
With the current punctuator semantic, this use case is impossible to implement.

On the other hand, making a punctuator to set the "wake up" trigger with 
respect to the current event timestamp would do the work. Two notes here:
1) the "current" timestamp event is of course a "best effort" approach as the 
granularity (and, so, the precision) of the time measurement depends on the 
granularity of the incoming events.
2) the semantic of a component like that would remain the same no matter if you 
are in "real time" or "reprocess" situation, preserving the determinism 
(please, give me a counter-example that could explain why the determinism 
wouldn't hold).


was (Author: JIRAUSER285799):
I can't get your point about determinism. If the punctuator set the next 
ring-time with respect to the current event-timestamp, the determinism would 
perfectly hold even in case of reprocessing.

Let's examine a practical use case: I want to monitor 2 sensors. The sensors 
are designed to send an "ok" signal and, sometimes, an "error" signal, when 
things are wrong. Let's imagine we want to be notified when a sensor sends an 
"error" signal and no "ok" signals in the next 5 minutes. Something like "if 
you're in an error state and the situation doesn't change for 5 minutes, then I 
want to take a particular action". On the other hand, if we receive an error 
signal but an "ok" signal arrives in the next 5 minutes, then we don't care 
about the error.
Now, let's imagine this situation: we receive an "error" signal from sensor 1, 
at event time t0. After a while we receive an "error signal" at event time t1, 
this time from sensor 2. We expect the behavior to be waked up at time t0+5min 
and at time t1+5min to take the appropriate actions (as soon as we do not 
receive any "ok" signal in the meanwhile). First, let's clarify that the time 
reference has necessarily to be the event time: indeed, if we receive an 
"error" event and the ingestion stops for 10 minutes (for example because of a 
network problem) we don't want to trigger any action as, actually, the sensor 
could have sent an "ok" signal in the meanwhile but we weren't able to consume 
it yet.
With the current punctuator semantic, this use case is impossible to implement.

On the other hand, making a punctuator to set the "wake up" trigger with 
respect to the current event timestamp would do the work. Two notes here:
1) the "current" timestamp event is of course a "best effort" approach as the 
granularity (and, so, the precision) of the time measurement depends on the 
granularity of the incoming events.
2) the semantic of a component like that would remain the same no matter if you 
are in "real time" or "reprocess" situation, preserving the determinism 
(please, give me a counter-example that could explain why the determinism 
wouldn't hold).

> 2nd punctuation using STREAM_TIME does not respect scheduled interval
> -
>
> Key: KAFKA-13678
> URL: https://issues.apache.org/jira/browse/KAFKA-13678
> Project: Kafka
>  Issue Type: Improvement
>  

[GitHub] [kafka] kowshik commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2022-02-25 Thread GitBox


kowshik commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r814678429



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -386,11 +397,143 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
*/
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: 
PartitionFetchState, topicPartition: TopicPartition): Boolean = {
 !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && 
quota.isQuotaExceeded
   }
 
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,

Review comment:
   This method is doing a lot of things, and it is worthwhile thinking 
about how to simplify it. In its current form, it is going to be hard to test 
it.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2022-02-25 Thread GitBox


kowshik commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r814640948



##
File path: core/src/main/scala/kafka/log/BaseIndex.scala
##
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{Closeable, File}
+import java.nio.file.Path
+
+import org.apache.kafka.common.utils.Utils
+
+/**
+ * This class represents a common abstraction for operations like delete and 
rename of the index files.

Review comment:
   > This class represents a common abstraction for operations like delete 
and rename of the index files.
   
   This class is slim in functionality, and I don't feel there is any real 
benefit for introducing this.
   Also for the future, it is not clear to me what operations can be included 
in this class, and which ones can't be.
   I feel that the earlier design without this base class was simpler.
   Are we planning to add new functionality in the future into this class?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2022-02-25 Thread GitBox


kowshik commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r814640948



##
File path: core/src/main/scala/kafka/log/BaseIndex.scala
##
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{Closeable, File}
+import java.nio.file.Path
+
+import org.apache.kafka.common.utils.Utils
+
+/**
+ * This class represents a common abstraction for operations like delete and 
rename of the index files.

Review comment:
   > This class represents a common abstraction for operations like delete 
and rename of the index files.
   
   This class is slim in functionality, and I don't feel there is any real 
benefit for introducing this.
   Also for the future, it is not clear to me what operations can be included 
in this class, and which ones can't be.
   I feel that the earlier design without this base class was simpler.
   Were you planning to add new functionality in the future into this class?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13678) 2nd punctuation using STREAM_TIME does not respect scheduled interval

2022-02-25 Thread Matteo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498061#comment-17498061
 ] 

Matteo commented on KAFKA-13678:


I can't get your point about determinism. If the punctuator set the next 
ring-time with respect to the current event-timestamp, the determinism would 
perfectly hold even in case of reprocessing.

Let's examine a practical use case: I want to monitor 2 sensors. The sensors 
are designed to send an "ok" signal and, sometimes, an "error" signal, when 
things are wrong. Let's imagine we want to be notified when a sensor sends an 
"error" signal and no "ok" signals in the next 5 minutes. Something like "if 
you're in an error state and the situation doesn't change for 5 minutes, then I 
want to take a particular action". On the other hand, if we receive an error 
signal but an "ok" signal arrives in the next 5 minutes, then we don't care 
about the error.
Now, let's imagine this situation: we receive an "error" signal from sensor 1, 
at event time t0. After a while we receive an "error signal" at event time t1, 
this time from sensor 2. We expect the behavior to be waked up at time t0+5min 
and at time t1+5min to take the appropriate actions (as soon as we do not 
receive any "ok" signal in the meanwhile). First, let's clarify that the time 
reference has necessarily to be the event time: indeed, if we receive an 
"error" event and the ingestion stops for 10 minutes (for example because of a 
network problem) we don't want to trigger any action as, actually, the sensor 
could have sent an "ok" signal in the meanwhile but we weren't able to consume 
it yet.
With the current punctuator semantic, this use case is impossible to implement.

On the other hand, making a punctuator to set the "wake up" trigger with 
respect to the current event timestamp would do the work. Two notes here:
1) the "current" timestamp event is of course a "best effort" approach as the 
granularity (and, so, the precision) of the time measurement depends on the 
granularity of the incoming events.
2) the semantic of a component like that would remain the same no matter if you 
are in "real time" or "reprocess" situation, preserving the determinism 
(please, give me a counter-example that could explain why the determinism 
wouldn't hold).

> 2nd punctuation using STREAM_TIME does not respect scheduled interval
> -
>
> Key: KAFKA-13678
> URL: https://issues.apache.org/jira/browse/KAFKA-13678
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Lorenzo Cagnatel
>Priority: Major
>
> Scheduling a punctuator using stream time, the first punctuation occurs 
> immediately as documented, but the second one is not triggered at *t_schedule 
> + interval* but it could happen before that time. 
> For example, assume that we schedule a punctuation every 10 sec at timestamp 
> 5 (t5). The system now works like this:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t10
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> punctuate, next schedule at t20
> ...{noformat}
> In this example the 2nd schedule occurs after 5 seconds from the first one, 
> breaking the interval duration.
> From my point of view, a reasonable behaviour could be:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t15
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> no punctuation
> t11 -> no punctuation
> t12 -> no punctuation
> t13 -> no punctuation
> t14 -> no punctuation
> t15 -> punctuate, next schedule at t25
> ...{noformat}
> The origin of this problem can be found in {*}StreamTask.schedule{*}:
> {code:java}
> /**
> * Schedules a punctuation for the processor
> *
> * @param interval the interval in milliseconds
> * @param type the punctuation type
> * @throws IllegalStateException if the current node is not null
> */
> public Cancellable schedule(final long interval, final PunctuationType type, 
> final Punctuator punctuator) {
>switch (type) {
>   case STREAM_TIME:
>  // align punctuation to 0L, punctuate as soon as we have data
>  return schedule(0L, interval, type, punctuator);
>   case WALL_CLOCK_TIME:
>  // align punctuation to now, punctuate after interval has elapsed
>  return schedule(time.milliseconds() + interval, interval, type, 
> punctuator);
>   default:
>  throw new IllegalArgumentException("Unrecognized PunctuationType: " 
> + type);
>}
> }{code}
> when, in case of stream time, it calls *schedule* with {*}startTime=0{*}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] kowshik commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2022-02-25 Thread GitBox


kowshik commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r814640948



##
File path: core/src/main/scala/kafka/log/BaseIndex.scala
##
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{Closeable, File}
+import java.nio.file.Path
+
+import org.apache.kafka.common.utils.Utils
+
+/**
+ * This class represents a common abstraction for operations like delete and 
rename of the index files.

Review comment:
   > This class represents a common abstraction for operations like delete 
and rename of the index files.
   
   This class is slim in functionality, and I don't feel there is any real 
benefit for introducing this.
   It is not clear to me going forward what operations can be included in this 
class, and which ones can't be.
   Are you planning to add new functionality in the future into this class, 
that, you want to introduce it in this PR?
   If not, I feel that the earlier design without this base class was simpler.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -623,40 +660,25 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-   * Handle the out of range error. Return false if
-   * 1) the request succeeded or
-   * 2) was fenced and this thread haven't received new epoch,
-   * which means we need not backoff and retry. True if there was a retriable 
error.
-   */
-  private def handleOutOfRangeError(topicPartition: TopicPartition,
-fetchState: PartitionFetchState,
-requestEpoch: Optional[Integer]): Boolean 
= {
-try {
-  val newFetchState = fetchOffsetAndTruncate(topicPartition, 
fetchState.topicId, fetchState.currentLeaderEpoch)
-  partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-  info(s"Current offset ${fetchState.fetchOffset} for partition 
$topicPartition is " +
-s"out of range, which typically implies a leader change. Reset fetch 
offset to ${newFetchState.fetchOffset}")
-  false
-} catch {
-  case _: FencedLeaderEpochException =>
-onPartitionFenced(topicPartition, requestEpoch)
-
-  case e @ (_ : UnknownTopicOrPartitionException |
-_ : UnknownLeaderEpochException |
-_ : NotLeaderOrFollowerException) =>
-info(s"Could not fetch offset for $topicPartition due to error: 
${e.getMessage}")
-true
-
-  case e: Throwable =>
-error(s"Error getting offset for partition $topicPartition", e)
-true
-}
-  }
-
-  /**
-   * Handle a partition whose offset is out of range and return a new fetch 
offset.
+   * It returns the next fetch state. It fetches the  log-start-offset or 
local-log-start-offset based on
+   * `fetchFromLocalLogStartOffset` flag. This is used in truncation by 
passing it to the given `truncateAndBuild`
+   * function.
+   *
+   * @param topicPartition   topic partition
+   * @param topicId  topic id
+   * @param currentLeaderEpoch   current leader epoch maintained by 
this follower replica.
+   * @param truncateAndBuild Function to truncate for the given 
epoch and offset. It returns the next fetch offset value.
+   * @param fetchFromLocalLogStartOffset Whether to fetch from 
local-log-start-offset or log-start-offset. If true, it
+   * requests the local-log-start-offset 
from the leader, else it requests
+   * log-start-offset from the leader. 
This is used in sending the value to the
+   * given `truncateAndBuild` function.
+   * @return
*/
-  protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, 
topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+  private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: 
TopicPartition,

Review comment:
   This method `fetchOffsetAndApplyTruncateAndBuild` is currently doing a 
number of things, which is clear from the method name. It will be hard to cover 
all test cases in unit test. 

[GitHub] [kafka] ecararus edited a comment on pull request #11803: KAFKA-13691: Rename target topic to custom name

2022-02-25 Thread GitBox


ecararus edited a comment on pull request #11803:
URL: https://github.com/apache/kafka/pull/11803#issuecomment-1050407569


   Related to ISSUE : 
[KAFKA-13691](https://issues.apache.org/jira/browse/KAFKA-13691)
   
   When I use mirrormaker it replicates the topics with different name, name of 
target is built based on
   
   sourceClusterAlias + separator + topic  

   
   implementation is 
[here](https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java#L49).
   
   Proposed improvement consist in:
   Adding new config property key: target.replication.policy.topic_map
   
   E.I. 
   
   source-> target.replication.policy.class= 
org.apache.kafka.connect.mirror.RenameTopicReplicationPolicy
   
   source->target.replication.policy.separator = .
   source->target.replication.policy.topic_map= 
SOURCER_TOPIC_A,DESTINATION_TOPIC_A; SOURCER_TOPIC_B,DESTINATION_TOPIC_B; 
   and add new RenameTopicReplicationPolicy which will be responsible to:
   1. override configure method load target.replication.policy.topic_map into a 
Map where Key is source topic name and value is target topic name 
   

   
2. override formatRemoteTopic so the target topic name will be loaded based 
on config
   
   @Override
   public String formatRemoteTopic(String sourceClusterAlias, String topic) {
   String targetTopic = topicMap.containsKey(topic) ?  topicMap.get(topic) 
: topic ;
   return super.formatRemoteTopic(sourceClusterAlias,targetTopic);
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12260) PartitionsFor should not return null value

2022-02-25 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498036#comment-17498036
 ] 

Bruno Cadonna commented on KAFKA-12260:
---

[~calohmn] Thanks for the head-up! 

> PartitionsFor should not return null value
> --
>
> Key: KAFKA-12260
> URL: https://issues.apache.org/jira/browse/KAFKA-12260
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
> Fix For: 3.0.0
>
>
> consumer.partitionsFor() could return null value when topic was not found. 
> This was not properly documented and was error-prone when the return type was 
> a list. We should fix the logic internally to prevent partitionsFor returning 
> null result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12260) PartitionsFor should not return null value

2022-02-25 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-12260.
---
Resolution: Fixed

> PartitionsFor should not return null value
> --
>
> Key: KAFKA-12260
> URL: https://issues.apache.org/jira/browse/KAFKA-12260
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
> Fix For: 3.0.0
>
>
> consumer.partitionsFor() could return null value when topic was not found. 
> This was not properly documented and was error-prone when the return type was 
> a list. We should fix the logic internally to prevent partitionsFor returning 
> null result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12260) PartitionsFor should not return null value

2022-02-25 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-12260:
--
Fix Version/s: 3.0.0

> PartitionsFor should not return null value
> --
>
> Key: KAFKA-12260
> URL: https://issues.apache.org/jira/browse/KAFKA-12260
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
> Fix For: 3.0.0
>
>
> consumer.partitionsFor() could return null value when topic was not found. 
> This was not properly documented and was error-prone when the return type was 
> a list. We should fix the logic internally to prevent partitionsFor returning 
> null result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12260) PartitionsFor should not return null value

2022-02-25 Thread Carsten Lohmann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498027#comment-17498027
 ] 

Carsten Lohmann commented on KAFKA-12260:
-

It looks like this is fixed in Kafka client 3.0.0 and 3.1.0:
https://github.com/apache/kafka/commit/e4f2f6f6e82cafbdea785d53521b96fe062e172d

> PartitionsFor should not return null value
> --
>
> Key: KAFKA-12260
> URL: https://issues.apache.org/jira/browse/KAFKA-12260
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> consumer.partitionsFor() could return null value when topic was not found. 
> This was not properly documented and was error-prone when the return type was 
> a list. We should fix the logic internally to prevent partitionsFor returning 
> null result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] RivenSun2 commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys

2022-02-25 Thread GitBox


RivenSun2 commented on pull request #11800:
URL: https://github.com/apache/kafka/pull/11800#issuecomment-1050676944


   @showuon 
   Sorry, I made a mistake, the `map` returned by 
`config.valuesWithPrefixOverride(prefix)` is not a normal map, it is of type 
`AbstractConfig.RecordingMap`.
   `RecordingMap#get(Object key)` will call the `AbstractConfig#ignore(String 
key)` method to change the `AbstractConfig.used` variable.
   Forgive my mistakes.
   
   However, after `valuesWithPrefixOverride.get("sasl.mechanism")` add 
`assertFalse(config.unknown().contains("prefix.sasl.mechanism"));`,
   testCase will fail to verify; because unknownKeys is only affected by 
originals and values, the value is originalKeys.removeAll(valueKeys).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys

2022-02-25 Thread GitBox


showuon commented on pull request #11800:
URL: https://github.com/apache/kafka/pull/11800#issuecomment-1050660740


   > In fact, in the testcases we discussed, `config.unused()` and 
`config.unknown()` will not change after `TestSecurityConfig config` is 
initialized. The following `config.valuesWithPrefixOverride(prefix)` and 
`valuesWithPrefixOverride.get()` will not cause the return value of 
`unused()` and `unknown()` to change
   
   Thanks for the explanation. But I don't think it is correct. If we check the 
original test below in `testValuesWithPrefixOverride`
   ```java
  // prefix overrides global
   assertTrue(config.unused().contains("prefix.sasl.mechanism"));
   assertTrue(config.unused().contains("sasl.mechanism"));
   assertEquals("GSSAPI", 
valuesWithPrefixOverride.get("sasl.mechanism"));
   assertFalse(config.unused().contains("sasl.mechanism"));
   assertFalse(config.unused().contains("prefix.sasl.mechanism"));
   ```
   We can see the `unused` was expected to contain those 2 configs. Then after 
`valuesWithPrefixOverride.get("sasl.mechanism")`, they are used now, that is, 
not containing in `unused` set. 
   So, what I mean is that since we've changed the 1st assert into `unknown`:
   ```
   assertTrue(config.unknown().contains("prefix.sasl.mechanism"));
   ```
   Theh, we should change the last assert to assert unknown there:
   ```
   assertFalse(config.unknown().contains("prefix.sasl.mechanism"));
   ```
   Does that make sense?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 edited a comment on pull request #11800: KAFKA-13689: Optimized the printing of AbstractConfig logs, and stripped unknownKeys from unusedKeys

2022-02-25 Thread GitBox


RivenSun2 edited a comment on pull request #11800:
URL: https://github.com/apache/kafka/pull/11800#issuecomment-1050656591


   In fact, in the testcases we discussed in `AbstractConfigTest`, 
`config.unused()` and `config.unknown()` will not change after 
`TestSecurityConfig config` is initialized.
   The following `config.valuesWithPrefixOverride(prefix)` and 
`valuesWithPrefixOverride.get()` will not cause the return value of 
`unused()` and `unknown()` to change.
   
   The main purpose of these testCases is to test whether the Map value 
returned by AbstractConfig#valuesWithPrefixOverride meets our expectations 
under different conditions.
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >