[jira] [Created] (KAFKA-13968) Broker should not generator snapshot until been unfenced

2022-06-07 Thread dengziming (Jira)
dengziming created KAFKA-13968:
--

 Summary: Broker should not generator snapshot until been unfenced
 Key: KAFKA-13968
 URL: https://issues.apache.org/jira/browse/KAFKA-13968
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: dengziming
Assignee: dengziming


 

There is a bug when computing `FeaturesDelta` which cause us to generate 
snapshot on every commit.

 

[2022-06-08 13:07:43,010] INFO [BrokerMetadataSnapshotter id=0] Creating a new 
snapshot at offset 0... (kafka.server.metadata.BrokerMetadataSnapshotter:66)

[2022-06-08 13:07:43,222] INFO [BrokerMetadataSnapshotter id=0] Creating a new 
snapshot at offset 2... (kafka.server.metadata.BrokerMetadataSnapshotter:66)

[2022-06-08 13:07:43,727] INFO [BrokerMetadataSnapshotter id=0] Creating a new 
snapshot at offset 3... (kafka.server.metadata.BrokerMetadataSnapshotter:66)

[2022-06-08 13:07:44,228] INFO [BrokerMetadataSnapshotter id=0] Creating a new 
snapshot at offset 4... (kafka.server.metadata.BrokerMetadataSnapshotter:66)

 

Before a broker being unfenced, it won't starting publishing metadata, so it's 
meaningless to  generate a snapshot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891910326


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, 
final Callback 
configBackingStore.removeConnectorConfig(connName))) {
+throw new ConnectException("Failed to remove connector 
configuration from config topic since worker was fenced out");
+}
 callback.onCompletion(null, new Created<>(false, null));

Review Comment:
   On second thought, I think it's probably fine to leave things as they are 
without adding a manual invocation of `Callback::onCompletion` and a `return 
null`. Yes, `writeToConfigTopicAsLeader` may throw an exception, but so could 
writes to the config topic before changes for this KIP were made (such as 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L835),
 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L894),
 and 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L993)).
   
   If we were throwing an exception from within the body of the herder request 
instead of a method that the request invokes, it'd make sense to change that to 
instead be a manual invocation of the callback with the exception. But just 
calling a method that might throw an exception is different, and follows 
existing precedent in the code base without having to jump through special 
callback-related hoops.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891910326


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, 
final Callback 
configBackingStore.removeConnectorConfig(connName))) {
+throw new ConnectException("Failed to remove connector 
configuration from config topic since worker was fenced out");
+}
 callback.onCompletion(null, new Created<>(false, null));

Review Comment:
   On second thought, I think it's probably fine to leave things as they are 
without adding a manual invocation of `Callback::onCompletion` and a `return 
null`. Yes, `writeToConfigTopicAsLeader` may throw an exception, but so could 
writes to the config topic before changes for this KIP were made (such as 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L835),
 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L894),
 and 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L993).
   
   If we were throwing an exception from within the body of the herder request 
instead of a method that the request invokes, it'd make sense to change that to 
instead be a manual invocation of the callback with the exception. But just 
calling a method that might throw an exception is different, and follows 
existing precedent in the code base without having to jump through special 
callback-related hoops.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891910326


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, 
final Callback 
configBackingStore.removeConnectorConfig(connName))) {
+throw new ConnectException("Failed to remove connector 
configuration from config topic since worker was fenced out");
+}
 callback.onCompletion(null, new Created<>(false, null));

Review Comment:
   On second thought, I think it's probably fine to leave things as they are 
without adding a manual invocation of `Callback::onCompletion` and a `return 
null`. Yes, `writeToConfigTopicAsLeader` may throw an exception, but so could 
writes to the config topic before changes for this KIP were made (such as 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L835),
 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L894),
 and 
[here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L993).
   
   If we were throwing an exception from within the body of the herder request 
instead of a method that the request invokes, it'd make sense to change that to 
instead be a manual invocation of the callback with the exception. But just 
calling a method that might throw an exception is fine and follows existing 
precedent in the code base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r891907113


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka, with support 
for exactly-once delivery guarantees.
+ */
+class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
+private static final Logger log = 
LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
+
+private boolean transactionOpen;
+private final LinkedHashMap 
commitableRecords;
+
+private final TransactionManager transactionManager;
+private final TransactionMetricsGroup transactionMetrics;
+
+private final ConnectorOffsetBackingStore offsetBackingStore;
+private final Runnable preProducerCheck;
+private final Runnable postProducerCheck;
+
+public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
+   SourceTask task,
+   TaskStatus.Listener statusListener,
+   TargetState initialState,
+   Converter keyConverter,
+   Converter valueConverter,
+   HeaderConverter headerConverter,
+   TransformationChain 
transformationChain,
+   Producer producer,
+   TopicAdmin admin,
+   Map 
topicGroups,
+   CloseableOffsetStorageReader 
offsetReader,
+   OffsetStorageWriter offsetWriter,
+   ConnectorOffsetBackingStore 
offsetBackingStore,
+   WorkerConfig workerConfig,
+   ClusterConfigState configState,
+   ConnectMetrics connectMetrics,
+   ClassLoader loader,
+   Time time,
+   RetryWithToleranceOperator 
retryWithToleranceOperator,
+   StatusBackingStore 

[GitHub] [kafka] C0urante opened a new pull request, #12264: KAFKA-13967: Document guarantees for producer callbacks on transaction commit

2022-06-07 Thread GitBox


C0urante opened a new pull request, #12264:
URL: https://github.com/apache/kafka/pull/12264

   [Jira](https://issues.apache.org/jira/browse/KAFKA-13967)
   
   Also added some `` tags to help organize the rendered Javadocs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r891901602


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka, with support 
for exactly-once delivery guarantees.
+ */
+class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
+private static final Logger log = 
LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
+
+private boolean transactionOpen;
+private final LinkedHashMap 
commitableRecords;
+
+private final TransactionManager transactionManager;
+private final TransactionMetricsGroup transactionMetrics;
+
+private final ConnectorOffsetBackingStore offsetBackingStore;
+private final Runnable preProducerCheck;
+private final Runnable postProducerCheck;
+
+public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
+   SourceTask task,
+   TaskStatus.Listener statusListener,
+   TargetState initialState,
+   Converter keyConverter,
+   Converter valueConverter,
+   HeaderConverter headerConverter,
+   TransformationChain 
transformationChain,
+   Producer producer,
+   TopicAdmin admin,
+   Map 
topicGroups,
+   CloseableOffsetStorageReader 
offsetReader,
+   OffsetStorageWriter offsetWriter,
+   ConnectorOffsetBackingStore 
offsetBackingStore,
+   WorkerConfig workerConfig,
+   ClusterConfigState configState,
+   ConnectMetrics connectMetrics,
+   ClassLoader loader,
+   Time time,
+   RetryWithToleranceOperator 
retryWithToleranceOperator,
+   StatusBackingStore 

[jira] [Created] (KAFKA-13967) Guarantees for producer callbacks on transaction commit should be documented

2022-06-07 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-13967:
-

 Summary: Guarantees for producer callbacks on transaction commit 
should be documented
 Key: KAFKA-13967
 URL: https://issues.apache.org/jira/browse/KAFKA-13967
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Chris Egerton
Assignee: Chris Egerton


As discussed in 
https://github.com/apache/kafka/pull/11780#discussion_r891835221, part of the 
contract for a transactional producer is that all callbacks given to the 
producer will have been invoked and completed (either successfully or by 
throwing an exception) by the time that {{KafkaProducer::commitTransaction}} 
returns. This should be documented so that users of the clients library can 
have a guarantee that they're not on the hook to do that kind of bookkeeping 
themselves.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji merged pull request #12258: MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft

2022-06-07 Thread GitBox


hachikuji merged PR #12258:
URL: https://github.com/apache/kafka/pull/12258


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13966) Flaky test `QuorumControllerTest.testUnregisterBroker`

2022-06-07 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13966:
---

 Summary: Flaky test `QuorumControllerTest.testUnregisterBroker`
 Key: KAFKA-13966
 URL: https://issues.apache.org/jira/browse/KAFKA-13966
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


We have seen the following assertion failure in 
`QuorumControllerTest.testUnregisterBroker`:

```

org.opentest4j.AssertionFailedError: expected: <2> but was: <0> at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161) at 
org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:628) at 
org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker(QuorumControllerTest.java:494)

```

I reproduced it by running the test in a loop. It looks like what happens is 
that the BrokerRegistration request is able to get interleaved between the 
leader change event and the write of the bootstrap metadata. Something like 
this:
 # handleLeaderChange() start
 # appendWriteEvent(registerBroker)
 # appendWriteEvent(bootstrapMetadata)
 # handleLeaderChange() finish
 # registerBroker() -> writes broker registration to log
 # bootstrapMetadata() -> writes bootstrap metadata to log



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13966) Flaky test `QuorumControllerTest.testUnregisterBroker`

2022-06-07 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13966:

Description: 
We have seen the following assertion failure in 
`QuorumControllerTest.testUnregisterBroker`:
{code:java}
org.opentest4j.AssertionFailedError: expected: <2> but was: <0>
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at 
org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:628)
at 
org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker(QuorumControllerTest.java:494)
 {code}
I reproduced it by running the test in a loop. It looks like what happens is 
that the BrokerRegistration request is able to get interleaved between the 
leader change event and the write of the bootstrap metadata. Something like 
this:
 # handleLeaderChange() start
 # appendWriteEvent(registerBroker)
 # appendWriteEvent(bootstrapMetadata)
 # handleLeaderChange() finish
 # registerBroker() -> writes broker registration to log
 # bootstrapMetadata() -> writes bootstrap metadata to log

  was:
We have seen the following assertion failure in 
`QuorumControllerTest.testUnregisterBroker`:

```

org.opentest4j.AssertionFailedError: expected: <2> but was: <0> at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161) at 
org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:628) at 
org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker(QuorumControllerTest.java:494)

```

I reproduced it by running the test in a loop. It looks like what happens is 
that the BrokerRegistration request is able to get interleaved between the 
leader change event and the write of the bootstrap metadata. Something like 
this:
 # handleLeaderChange() start
 # appendWriteEvent(registerBroker)
 # appendWriteEvent(bootstrapMetadata)
 # handleLeaderChange() finish
 # registerBroker() -> writes broker registration to log
 # bootstrapMetadata() -> writes bootstrap metadata to log


> Flaky test `QuorumControllerTest.testUnregisterBroker`
> --
>
> Key: KAFKA-13966
> URL: https://issues.apache.org/jira/browse/KAFKA-13966
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> We have seen the following assertion failure in 
> `QuorumControllerTest.testUnregisterBroker`:
> {code:java}
> org.opentest4j.AssertionFailedError: expected: <2> but was: <0>
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at 
> org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
>   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166)
>   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161)
>   at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:628)
>   at 
> org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker(QuorumControllerTest.java:494)
>  {code}
> I reproduced it by running the test in a loop. It looks like what happens is 
> that the BrokerRegistration request is able to get interleaved between the 
> leader change event and the write of the bootstrap metadata. Something like 
> this:
>  # handleLeaderChange() start
>  # appendWriteEvent(registerBroker)
>  # appendWriteEvent(bootstrapMetadata)
>  # handleLeaderChange() finish
>  # registerBroker() -> writes broker registration to log
>  # bootstrapMetadata() -> writes bootstrap metadata to log



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon commented on pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc

2022-06-07 Thread GitBox


showuon commented on PR #12261:
URL: https://github.com/apache/kafka/pull/12261#issuecomment-1149358946

   @ijuma , thanks for the comments. I've updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc

2022-06-07 Thread GitBox


showuon commented on code in PR #12261:
URL: https://github.com/apache/kafka/pull/12261#discussion_r891842384


##
docs/upgrade.html:
##
@@ -213,6 +213,9 @@ Notable changes in 3
 
 Notable changes in 
3.0.0
 
+Java 8 and Scala 2.12 support have been deprecated since Apache Kafka 
3.0 and will be removed in Apache Kafka 4.0.
+See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223;>KIP-750
+and https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218;>KIP-751
 for more details.

Review Comment:
   Agree!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

2022-06-07 Thread GitBox


hachikuji commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r891835221


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka, with support 
for exactly-once delivery guarantees.
+ */
+class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
+private static final Logger log = 
LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
+
+private boolean transactionOpen;
+private final LinkedHashMap 
commitableRecords;
+
+private final TransactionManager transactionManager;
+private final TransactionMetricsGroup transactionMetrics;
+
+private final ConnectorOffsetBackingStore offsetBackingStore;
+private final Runnable preProducerCheck;
+private final Runnable postProducerCheck;
+
+public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
+   SourceTask task,
+   TaskStatus.Listener statusListener,
+   TargetState initialState,
+   Converter keyConverter,
+   Converter valueConverter,
+   HeaderConverter headerConverter,
+   TransformationChain 
transformationChain,
+   Producer producer,
+   TopicAdmin admin,
+   Map 
topicGroups,
+   CloseableOffsetStorageReader 
offsetReader,
+   OffsetStorageWriter offsetWriter,
+   ConnectorOffsetBackingStore 
offsetBackingStore,
+   WorkerConfig workerConfig,
+   ClusterConfigState configState,
+   ConnectMetrics connectMetrics,
+   ClassLoader loader,
+   Time time,
+   RetryWithToleranceOperator 
retryWithToleranceOperator,
+   StatusBackingStore 

[jira] [Updated] (KAFKA-13965) Document broker-side socket-server-metrics

2022-06-07 Thread James Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng updated KAFKA-13965:

Labels: newbie newbie++  (was: )

> Document broker-side socket-server-metrics
> --
>
> Key: KAFKA-13965
> URL: https://issues.apache.org/jira/browse/KAFKA-13965
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 3.2.0
>Reporter: James Cheng
>Priority: Major
>  Labels: newbie, newbie++
>
> There are a bunch of broker JMX metrics in the "socket-server-metrics" space 
> that are not documented on kafka.apache.org/documentation
>  
>  * {_}MBean{_}: 
> kafka.server:{{{}type=socket-server-metrics,listener=,networkProcessor={}}}
>  ** From KIP-188: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks]
>  *  
> kafka.server:type=socket-server-metrics,name=connection-accept-rate,listener=\{listenerName}
>  ** From KIP-612: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]
> It would be helpful to get all the socket-server-metrics documented
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13965) Document broker-side socket-server-metrics

2022-06-07 Thread James Cheng (Jira)
James Cheng created KAFKA-13965:
---

 Summary: Document broker-side socket-server-metrics
 Key: KAFKA-13965
 URL: https://issues.apache.org/jira/browse/KAFKA-13965
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Affects Versions: 3.2.0
Reporter: James Cheng


There are a bunch of broker JMX metrics in the "socket-server-metrics" space 
that are not documented on kafka.apache.org/documentation

 
 * {_}MBean{_}: 
kafka.server:{{{}type=socket-server-metrics,listener=,networkProcessor={}}}
 ** From KIP-188: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks]
 *  
kafka.server:type=socket-server-metrics,name=connection-accept-rate,listener=\{listenerName}
 ** From KIP-612: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]

It would be helpful to get all the socket-server-metrics documented

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji merged pull request #12086: Minor : cleanup.policy is a comma separated list

2022-06-07 Thread GitBox


hachikuji merged PR #12086:
URL: https://github.com/apache/kafka/pull/12086


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13942) LogOffsetTest occasionally hangs during Jenkins build

2022-06-07 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13942.
-
Resolution: Fixed

> LogOffsetTest occasionally hangs during Jenkins build
> -
>
> Key: KAFKA-13942
> URL: https://issues.apache.org/jira/browse/KAFKA-13942
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: David Arthur
>Priority: Minor
>
> [~hachikuji] parsed the log output of one of the recent stalled Jenkins 
> builds and singled out LogOffsetTest as a likely culprit for not completing.
> I looked closely at the following build which appeared to be stuck and found 
> this test case had STARTED but not PASSED or FAILED.
> 15:19:58  LogOffsetTest > 
> testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(String) > 
> kafka.server.LogOffsetTest.testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(String)[2]
>  STARTED



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji merged pull request #12262: KAFKA-13942: Fix kraft timeout in LogOffsetTest

2022-06-07 Thread GitBox


hachikuji merged PR #12262:
URL: https://github.com/apache/kafka/pull/12262


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] d-t-w commented on pull request #12260: MINOR: add note on IDEMPOTENT_WRITE ACL to 3.2.0 notable changes

2022-06-07 Thread GitBox


d-t-w commented on PR #12260:
URL: https://github.com/apache/kafka/pull/12260#issuecomment-1149320749

   Hi @ijuma I rm'd the `ul` tag.
   
   That format was originally requested in the comments here: 
https://issues.apache.org/jira/browse/KAFKA-13598
   
   More info here: https://kpow.io/articles/kafka-producer-breaking-change/
   
   I'm not sure including this line in the previous `li` is a good idea that 
mixes this note into the context of a statement about Connect, this breaking 
change impacts any user of Kafka where:
   
   1. The Kafka Cluster has brokers running version < 2.8.0, and
   2. The Kafka Cluster has ACLs configured, but not IDEMPOTENT_WRITE, and
   3. Producer configuration is default, or capable of being defaulted to 
idempotent
   4. The producing application is using Kafka-Clients version > 3.2.0
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551333#comment-17551333
 ] 

Matthias J. Sax commented on KAFKA-13963:
-

{quote}Is it worth updating the java doc to mention this?
{quote}
Updating docs can never hurt :) – are you interested in doing a PR?
{quote}if you use the internal RecordCollector, which I feel should be better 
hidden from the streams api users
{quote}
Yes, you should NEVER use internal stuff... Not sure how we could "better hide" 
it though? Seems not to be possible as long as we are using Java 8...
{quote}I can open up a separate bug for that if it makes sense.
{quote}
Don't think it's a bug? It (unfortunately) how Java works.

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-10000) Atomic commit of source connector records and offsets

2022-06-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-1:
--
Priority: Blocker  (was: Major)

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891811716


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -309,16 +355,70 @@ public void start() {
 @Override
 public void stop() {
 log.info("Closing KafkaConfigBackingStore");
-try {
-configLog.stop();
-} finally {
-if (ownTopicAdmin != null) {
-ownTopicAdmin.close();
-}
+
+if (fencableProducer != null) {
+Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), 
"fencable producer for config topic");
 }
+Utils.closeQuietly(ownTopicAdmin, "admin for config topic");
+Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic");
+
 log.info("Closed KafkaConfigBackingStore");
 }
 
+@Override
+public void claimWritePrivileges() {
+if (usesFencableWriter && fencableProducer == null) {
+try {
+fencableProducer = createFencableProducer();
+fencableProducer.initTransactions();
+} catch (Exception e) {
+if (fencableProducer != null) {
+Utils.closeQuietly(() -> 
fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
+fencableProducer = null;
+}
+throw new ConnectException("Failed to create and initialize 
fencable producer for config topic", e);
+}
+}
+}
+
+private Map baseProducerProps(WorkerConfig workerConfig) {
+Map producerProps = new 
HashMap<>(workerConfig.originals());
+String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(workerConfig);
+producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+ConnectUtils.addMetricsContextProperties(producerProps, workerConfig, 
kafkaClusterId);
+return producerProps;
+}
+
+// Visible for testing
+Map fencableProducerProps(DistributedConfig workerConfig) {
+Map result = new 
HashMap<>(baseProducerProps(workerConfig));
+
+// Always require producer acks to all to ensure durable writes
+result.put(ProducerConfig.ACKS_CONFIG, "all");
+// Don't allow more than one in-flight request to prevent reordering 
on retry (if enabled)
+result.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

Review Comment:
   Rebase complete; should be resolved now.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -291,7 +328,16 @@ public void start() {
 log.info("Starting KafkaConfigBackingStore");
 // Before startup, callbacks are *not* invoked. You can grab a 
snapshot after starting -- just take care that
 // updates can continue to occur in the background
-configLog.start();
+try {
+configLog.start();
+} catch (UnsupportedVersionException e) {
+throw new ConnectException(
+"Enabling exactly-once support for source connectors 
requires a Kafka broker version that allows "
++ "admin clients to read consumer offsets. Disable 
the worker's exactly-once support "
++ "for source connectors, or use a new Kafka 
broker version.",

Review Comment:
   Rebase complete; should be resolved now.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -784,6 +845,14 @@ private static Map 
connectorClientConfigOverrides(ConnectorTaskI
 return clientOverrides;
 }
 
+private String transactionalId(ConnectorTaskId id) {

Review Comment:
   Rebase complete; should be resolved now.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -320,6 +320,18 @@ public void putTaskConfigs(final @PathParam("connector") 
String connector,
 completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", 
"POST", headers, taskConfigs, forward);
 }
 
+@PUT
+@Path("/{connector}/fence")
+public Response fenceZombies(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean forward,
+ final byte[] requestBody) throws Throwable {
+FutureCallback cb = new FutureCallback<>();
+herder.fenceZombies(connector, cb, 
InternalRequestSignature.fromHeaders(requestBody, headers));
+completeOrForwardRequest(cb, 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12250: KAFKA-13935 Fix static usages of IBP in KRaft mode

2022-06-07 Thread GitBox


hachikuji commented on code in PR #12250:
URL: https://github.com/apache/kafka/pull/12250#discussion_r891771090


##
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala:
##
@@ -106,7 +110,8 @@ class RemoteLeaderEndPoint(logPrefix: String,
   .setPartitionIndex(topicPartition.partition)
   .setCurrentLeaderEpoch(currentLeaderEpoch)
   .setTimestamp(earliestOrLatest)))
-val requestBuilder = 
ListOffsetsRequest.Builder.forReplica(brokerConfig.listOffsetRequestVersion, 
brokerConfig.brokerId)
+val meteadataVersion = metadataVersionSupplier()

Review Comment:
   nit: typo `meteadataVersion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891769739


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+partition.currentVoters().forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+v.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+v.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+partition.observers().forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+o.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+o.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp(;
+});
+QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, 
observers);
+return info;
+}
+
+@Override
+DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+return new Builder(DescribeQuorumRequest.singletonRequest(
+new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition(;
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();
+}
+if (quorumResponse.data().topics().size() > 1) {

Review Comment:
   Maybe we should check if size is not equal to 1 here and below. I guess an 
empty list is also possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891768433


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.

Review Comment:
   Yeah, I agree with this. Maybe we can just say that this class contains 
useful debugging state for KRaft replication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891766251


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4330,73 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+Integer partition = 0;
+String topicName = response.getTopicNameByIndex(0);
+Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+response.getVoterInfo(topicName, partition).forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+OptionalLong.of(v.lastFetchTimestamp()),
+OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+response.getObserverInfo(topicName, partition).forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+OptionalLong.of(o.lastFetchTimestamp()),
+OptionalLong.of(o.lastCaughtUpTimestamp(;
+});
+QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, 
observers);
+return info;
+}
+
+@Override
+DescribeQuorumRequest.Builder createRequest(int timeoutMs) {

Review Comment:
   Nevermind, this is an override.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12181: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2)

2022-06-07 Thread GitBox


hachikuji commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r891635253


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -915,15 +918,21 @@ Set imbalancedPartitions() {
 return new HashSet<>(imbalancedPartitions);
 }
 
-ControllerResult 
alterPartition(AlterPartitionRequestData request) {
+ControllerResult alterPartition(
+ControllerRequestContext context,
+AlterPartitionRequestData request
+) {
 clusterControl.checkBrokerEpoch(request.brokerId(), 
request.brokerEpoch());
 AlterPartitionResponseData response = new AlterPartitionResponseData();
 List records = new ArrayList<>();
 for (AlterPartitionRequestData.TopicData topicData : request.topics()) 
{
 AlterPartitionResponseData.TopicData responseTopicData =
-new 
AlterPartitionResponseData.TopicData().setName(topicData.name());
+new AlterPartitionResponseData.TopicData().
+setTopicName(topicData.topicName()).
+setTopicId(topicData.topicId());
 response.topics().add(responseTopicData);
-Uuid topicId = topicsByName.get(topicData.name());
+Uuid topicId = topicData.topicId().equals(Uuid.ZERO_UUID) ?
+topicsByName.get(topicData.topicName()) : topicData.topicId();

Review Comment:
   This wasn't covered in the KIP, but when we cannot find the provided 
`TopicId`, should we return `UNKNOWN_TOPIC_ID`?



##
clients/src/main/java/org/apache/kafka/common/errors/IneligibleReplica.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class IneligibleReplica extends ApiException {

Review Comment:
   nit: I think the usual convention is to add the "Exception" suffix



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -167,7 +168,8 @@ sealed trait PendingPartitionChange extends PartitionState {
 case class PendingExpandIsr(
   isr: Set[Int],
   newInSyncReplicaId: Int,
-  sentLeaderAndIsr: LeaderAndIsr
+  sentLeaderAndIsr: LeaderAndIsr,
+  partitionStateToRollBackTo: PartitionState

Review Comment:
   nit: Wonder if we could use a more concise name. Maybe `priorState` or 
`lastCommittedState`?



##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -2225,194 +2223,210 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-alterPartitionRequest.topics.forEach { topicReq =>
-  topicReq.partitions.forEach { partitionReq =>
-partitionsToAlter.put(
-  new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-  LeaderAndIsr(
-alterPartitionRequest.brokerId,
-partitionReq.leaderEpoch,
-partitionReq.newIsr().asScala.toList.map(_.toInt),
-LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-partitionReq.partitionEpoch
-  )
-)
-  }
-}
-
-def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-  val resp = new AlterPartitionResponseData()
-  results match {
-case Right(error) =>
-  resp.setErrorCode(error.code)
-case Left(partitionResults) =>
-  resp.setTopics(new util.ArrayList())
-  partitionResults
-.groupBy { case (tp, _) => tp.topic }   // Group by topic
-.foreach { case (topic, partitions) =>
-  // Add each topic part to the response
-  val topicResp = new AlterPartitionResponseData.TopicData()
-.setName(topic)
-.setPartitions(new util.ArrayList())
-  resp.topics.add(topicResp)
-  partitions.foreach { case (tp, errorOrIsr) =>
-// Add each partition part to the response (new ISR or error)
-errorOrIsr 

[jira] [Comment Edited] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Tomasz Kaszuba (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551289#comment-17551289
 ] 

Tomasz Kaszuba edited comment on KAFKA-13963 at 6/7/22 9:07 PM:


Ok, this is what I thought. Is it worth updating the java doc to mention this? 
The developers I work with were surprised that context.forward is not covered. 
We rely heavily on the generated topology graphs for impact analysis.

Btw, I think you can get around the context forward exception and the need for 
registering sinks if you use the internal RecordCollector, which I feel should 
be better hidden from the streams api users since it's a class cast exception 
waiting to happen. I can open up a separate bug for that if it makes sense.
{code:java}
collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector 
{code}


was (Author: tkaszuba):
Ok, this is what I thought. Is it worth updating the java doc to mention this? 
The developers I work with were surprised that context.forward is not covered. 
We really heavily on the generated topology graphs for impact analysis.

Btw, I think you can get around the context forward exception and the need for 
registering sinks if you use the internal RecordCollector, which I feel should 
be better hidden from the streams api users since it's a class cast exception 
waiting to happen. I can open up a separate bug for that if it makes sense.
{code:java}
collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector 
{code}

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Tomasz Kaszuba (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551289#comment-17551289
 ] 

Tomasz Kaszuba commented on KAFKA-13963:


Ok, this is what I thought. Is it worth updating the java doc to mention this? 
The developers I work with were surprised that context.forward is not covered. 
We really heavily on the generated topology graphs for impact analysis.

Btw, I think you can get around the context forward exception and the need for 
registering sinks if you use the internal RecordCollector, which I feel should 
be better hidden from the streams api users since it's a class cast exception 
waiting to happen. I can open up a separate bug for that if it makes sense.
{code:java}
collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector 
{code}

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13898) metrics.recording.level is underdocumented

2022-06-07 Thread Richard Joerger (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Joerger reassigned KAFKA-13898:
---

Assignee: Richard Joerger

> metrics.recording.level is underdocumented
> --
>
> Key: KAFKA-13898
> URL: https://issues.apache.org/jira/browse/KAFKA-13898
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Tom Bentley
>Assignee: Richard Joerger
>Priority: Minor
>  Labels: newbie
>
> metrics.recording.level is only briefly described in the documentation. In 
> particular the recording level associated with each metric is not documented, 
> which makes it difficult to know the effect of changing the level.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551271#comment-17551271
 ] 

Matthias J. Sax commented on KAFKA-13963:
-

TopologyDescription only describes the structure of you graph of operators. In 
your first example, you only added two nodes to the graph ("source" and 
"process") and there is no node "output", and thus it's not contained in the 
`TopologyDescription`.

It's not really possible to take the business logic (ie, what `forward()` is 
doing) into account – at least I have not idea how this could be done with 
reasonable effort.

It's for sure not a bug. We should either close this ticket and change it into 
a feature request.

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13964) kafka-configs.sh end with UnsupportedVersionException when describing TLS user with quotas

2022-06-07 Thread Jakub Stejskal (Jira)
Jakub Stejskal created KAFKA-13964:
--

 Summary: kafka-configs.sh end with UnsupportedVersionException 
when describing TLS user with quotas 
 Key: KAFKA-13964
 URL: https://issues.apache.org/jira/browse/KAFKA-13964
 Project: Kafka
  Issue Type: Bug
  Components: admin, kraft
Affects Versions: 3.2.0
 Environment: Kafka 3.2.0 running on OpenShift 4.10 in KRaft mode 
managed by Strimzi
Reporter: Jakub Stejskal


{color:#424242}Usage of {color:#424242}kafka-configs.sh end with 
{color:#424242}org.apache.kafka.common.errors.UnsupportedVersionException: The 
broker does not support DESCRIBE_USER_SCRAM_CREDENTIALS when describing TLS 
user with quotas enabled. {color}{color}{color}

 
{code:java}
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --user 
CN=encrypted-arnost` got status code 1 and stderr: -- Error while executing 
config command with args '--bootstrap-server localhost:9092 --describe --user 
CN=encrypted-arnost' java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not 
support DESCRIBE_USER_SCRAM_CREDENTIALS{code}
STDOUT contains all necessary data, but the script itself ends with return code 
1 and the error above. Scram-sha has not been configured anywhere in that case 
(not supported by KRaft). This might be fixed by adding support for scram-sha 
in the next version (not reproducible without KRaft enabled).

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891613523


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+partition.currentVoters().forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+OptionalLong.of(v.lastFetchTimestamp()),
+OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+partition.observers().forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+OptionalLong.of(o.lastFetchTimestamp()),
+OptionalLong.of(o.lastCaughtUpTimestamp(;
+});
+QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, 
observers);
+return info;
+}
+
+@Override
+DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+return new Builder(DescribeQuorumRequest.singletonRequest(
+new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition(;
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+try {
+if (quorumResponse.data().errorCode() != 
Errors.NONE.code()) {
+throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();
+}
+if (quorumResponse.data().topics().size() > 1) {
+String msg = String.format("DescribeMetadataQuorum 
received {} topics when 1 was expected",
+quorumResponse.data().topics().size());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+DescribeQuorumResponseData.TopicData topic = 
quorumResponse.data().topics().get(0);
+if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+String msg = String.format("DescribeMetadataQuorum 
received a topic with name {} when {} was expected",
+topic.topicName(), METADATA_TOPIC_NAME);
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+if (topic.partitions().size() > 1) {
+String msg = String.format("DescribeMetadataQuorum 
received a topic {} with {} partitions when 1 was expected",
+topic.topicName(), topic.partitions().size());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+DescribeQuorumResponseData.PartitionData partition = 
topic.partitions().get(0);
+if (partition.partitionIndex() != 
METADATA_TOPIC_PARTITION.partition()) {
+String msg = String.format("DescribeMetadataQuorum 
received a single partition with index {} when {} was expected",
+partition.partitionIndex(), 
METADATA_TOPIC_PARTITION.partition());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+if (partition.errorCode() != Errors.NONE.code()) {
+throw 
Errors.forCode(partition.errorCode()).exception();
+}
+future.complete(createQuorumResult(partition));
+} catch (RuntimeException e) {
+throw e;
+} catch (Exception e) {

Review Comment:
   `UnknownServerException` extends `KafkaException`, which extends 
`RuntimeException`. So I think all the errors that we are raising above get 
re-thrown in the 

[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-06-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551235#comment-17551235
 ] 

Guozhang Wang commented on KAFKA-13939:
---

Once the PR is merged, we can cherry-pick the commit to old branches. But 
whether the fix would be release depends on whether we would have a bug-fix 
release (e.g. say 3.2.1) planned in the future.

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] jnewhouse opened a new pull request, #12263: KAFKA-13939: Only track dirty keys if logging is enabled.

2022-06-07 Thread GitBox


jnewhouse opened a new pull request, #12263:
URL: https://github.com/apache/kafka/pull/12263

   InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys
   that have been seen in order to log them for durability.
   This set is never used nor cleared if logging is not enabled.
   Having it be populated creates a memory leak.
   This change stops populating the set if logging is not enabled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12262: MINOR: Fix kraft timeout in LogOffsetTest

2022-06-07 Thread GitBox


hachikuji opened a new pull request, #12262:
URL: https://github.com/apache/kafka/pull/12262

   We have been seeing a lot of timeouts in `LogOffsetTest` when KRaft is 
enabled. The problem is the dependence on `MockTime`. In the KRaft broker, we 
depend on having a steadily advancing time for events in `KafkaEventQueue` to 
get executed. In the case of the timeouts, the broker was stuck with the next 
heartbeat event in the queue, which we depended on in order to send the next 
heartbeat and complete the `initialCatchUpFuture`, which is needed to finish 
startup.
   
   As far as I can tell, the test does not have a strong dependence on 
`MockTime`, so I have replaced it with system time.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-06-07 Thread Jackson Newhouse (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551217#comment-17551217
 ] 

Jackson Newhouse commented on KAFKA-13939:
--

What's the protocol for back-porting a fix like this?

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-06-07 Thread Jackson Newhouse (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551215#comment-17551215
 ] 

Jackson Newhouse commented on KAFKA-13939:
--

I'll open a PR.

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] jsancio merged pull request #12245: KAFKA-13410; Add a --release-version flag for storage-tool

2022-06-07 Thread GitBox


jsancio merged PR #12245:
URL: https://github.com/apache/kafka/pull/12245


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12245: KAFKA-13410; Add a --release-version flag for storage-tool

2022-06-07 Thread GitBox


mumrah commented on code in PR #12245:
URL: https://github.com/apache/kafka/pull/12245#discussion_r891572046


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -97,9 +97,9 @@ object StorageTool extends Logging {
   help("The cluster ID to use.")
 formatParser.addArgument("--ignore-formatted", "-g").
   action(storeTrue())
-formatParser.addArgument("--metadata-version", "-v").

Review Comment:
   That's right, it has not been included in any release. We introduced it in 
1135f22eaf which was pretty recent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12245: KAFKA-13410; Add a --release-version flag for storage-tool

2022-06-07 Thread GitBox


jsancio commented on code in PR #12245:
URL: https://github.com/apache/kafka/pull/12245#discussion_r891557424


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -97,9 +97,9 @@ object StorageTool extends Logging {
   help("The cluster ID to use.")
 formatParser.addArgument("--ignore-formatted", "-g").
   action(storeTrue())
-formatParser.addArgument("--metadata-version", "-v").

Review Comment:
   We don't need to deprecate this because this version of the storage tool was 
never released, right? `git tag --contains` doesn't show any reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891554323


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4337,25 +4337,22 @@ public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuoru
 final Call call = new Call(
 "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
 
-private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
-Integer partition = 0;
-String topicName = response.getTopicNameByIndex(0);
-Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
 List voters = new ArrayList<>();
 List observers = new ArrayList<>();
-response.getVoterInfo(topicName, partition).forEach(v -> {
+partition.currentVoters().forEach(v -> {
 voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
 v.logEndOffset(),
 OptionalLong.of(v.lastFetchTimestamp()),
 OptionalLong.of(v.lastCaughtUpTimestamp(;

Review Comment:
   I went back and forth between that and ended up returning a -1 optional 
here. I now remember that the original intention was to have an empty optional. 
Will address this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12250: KAFKA-13935 Fix static usages of IBP in KRaft mode

2022-06-07 Thread GitBox


junrao commented on code in PR #12250:
URL: https://github.com/apache/kafka/pull/12250#discussion_r891551766


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1790,38 +1790,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   // We keep the user-provided String as `MetadataVersion.fromVersionString` 
can choose a slightly different version (eg if `0.10.0`
   // is passed, `0.10.0-IV0` may be picked)
   val interBrokerProtocolVersionString = 
getString(KafkaConfig.InterBrokerProtocolVersionProp)
-  val interBrokerProtocolVersion = 
MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
-
-  val fetchRequestVersion: Short =
-if (interBrokerProtocolVersion.isAtLeast(IBP_3_1_IV0)) 13
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_7_IV1)) 12
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 11
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV2)) 10
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 8
-else if (interBrokerProtocolVersion.isAtLeast(IBP_1_1_IV0)) 7
-else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV1)) 5
-else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 4
-else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV1)) 3
-else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_0_IV0)) 2
-else if (interBrokerProtocolVersion.isAtLeast(IBP_0_9_0)) 1
-else 0
-
-  val offsetForLeaderEpochRequestVersion: Short =
-if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 4
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 3
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 2
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV0)) 1
-else 0
-
-  val listOffsetRequestVersion: Short =
-if (interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)) 7
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 6
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_2_IV1)) 5
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 4
-else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 3
-else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 2
-else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2)) 1
-else 0
+  val interBrokerProtocolVersion = if (processRoles.isEmpty) {

Review Comment:
   @mumrah : Thanks for the explanation. Covering that in a separate PR sounds 
good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12236: MINOR: Use right enum value for broker registration change

2022-06-07 Thread GitBox


jsancio commented on PR #12236:
URL: https://github.com/apache/kafka/pull/12236#issuecomment-1148986153

   @dengziming I merged KAFKA-13916. When you have sometime can you take a look 
at this change again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

2022-06-07 Thread GitBox


jsancio merged PR #12240:
URL: https://github.com/apache/kafka/pull/12240


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891518982


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+partition.currentVoters().forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+OptionalLong.of(v.lastFetchTimestamp()),
+OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+partition.observers().forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+OptionalLong.of(o.lastFetchTimestamp()),
+OptionalLong.of(o.lastCaughtUpTimestamp(;
+});
+QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, 
observers);
+return info;
+}
+
+@Override
+DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+return new Builder(DescribeQuorumRequest.singletonRequest(
+new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition(;
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+try {
+if (quorumResponse.data().errorCode() != 
Errors.NONE.code()) {
+throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();
+}
+if (quorumResponse.data().topics().size() > 1) {
+String msg = String.format("DescribeMetadataQuorum 
received {} topics when 1 was expected",
+quorumResponse.data().topics().size());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+DescribeQuorumResponseData.TopicData topic = 
quorumResponse.data().topics().get(0);
+if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+String msg = String.format("DescribeMetadataQuorum 
received a topic with name {} when {} was expected",
+topic.topicName(), METADATA_TOPIC_NAME);
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+if (topic.partitions().size() > 1) {
+String msg = String.format("DescribeMetadataQuorum 
received a topic {} with {} partitions when 1 was expected",
+topic.topicName(), topic.partitions().size());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+DescribeQuorumResponseData.PartitionData partition = 
topic.partitions().get(0);
+if (partition.partitionIndex() != 
METADATA_TOPIC_PARTITION.partition()) {
+String msg = String.format("DescribeMetadataQuorum 
received a single partition with index {} when {} was expected",
+partition.partitionIndex(), 
METADATA_TOPIC_PARTITION.partition());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+if (partition.errorCode() != Errors.NONE.code()) {
+throw 
Errors.forCode(partition.errorCode()).exception();
+}
+future.complete(createQuorumResult(partition));
+} catch (RuntimeException e) {
+throw e;
+} catch (Exception e) {

Review Comment:
   So this block is me trying to have a single `future.completeExceptionally()` 
call in this code block. We are catching `UnknownServerException` and any 
exception 

[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891518561


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+partition.currentVoters().forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+OptionalLong.of(v.lastFetchTimestamp()),
+OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+partition.observers().forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+OptionalLong.of(o.lastFetchTimestamp()),
+OptionalLong.of(o.lastCaughtUpTimestamp(;
+});
+QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, 
observers);
+return info;
+}
+
+@Override
+DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+return new Builder(DescribeQuorumRequest.singletonRequest(
+new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition(;
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+try {
+if (quorumResponse.data().errorCode() != 
Errors.NONE.code()) {
+throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();
+}
+if (quorumResponse.data().topics().size() > 1) {
+String msg = String.format("DescribeMetadataQuorum 
received {} topics when 1 was expected",
+quorumResponse.data().topics().size());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+DescribeQuorumResponseData.TopicData topic = 
quorumResponse.data().topics().get(0);
+if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+String msg = String.format("DescribeMetadataQuorum 
received a topic with name {} when {} was expected",
+topic.topicName(), METADATA_TOPIC_NAME);
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+if (topic.partitions().size() > 1) {
+String msg = String.format("DescribeMetadataQuorum 
received a topic {} with {} partitions when 1 was expected",
+topic.topicName(), topic.partitions().size());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+DescribeQuorumResponseData.PartitionData partition = 
topic.partitions().get(0);
+if (partition.partitionIndex() != 
METADATA_TOPIC_PARTITION.partition()) {
+String msg = String.format("DescribeMetadataQuorum 
received a single partition with index {} when {} was expected",
+partition.partitionIndex(), 
METADATA_TOPIC_PARTITION.partition());
+log.debug(msg);
+throw new UnknownServerException(msg);
+}
+if (partition.errorCode() != Errors.NONE.code()) {
+throw 
Errors.forCode(partition.errorCode()).exception();
+}
+future.complete(createQuorumResult(partition));
+} catch (RuntimeException e) {
+throw e;

Review Comment:
   The reason I added this block is because I noticed a gradle warning which 
suggested that (with the addition of the general Exception catch block), some 
runtime exceptions might get hidden. A little 

[GitHub] [kafka] mimaison commented on pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


mimaison commented on PR #11779:
URL: https://github.com/apache/kafka/pull/11779#issuecomment-1148919718

   Thanks for the quick updates. I'll try to make another pass tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


mimaison commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891478789


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -320,6 +320,18 @@ public void putTaskConfigs(final @PathParam("connector") 
String connector,
 completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", 
"POST", headers, taskConfigs, forward);
 }
 
+@PUT
+@Path("/{connector}/fence")
+public Response fenceZombies(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean forward,
+ final byte[] requestBody) throws Throwable {
+FutureCallback cb = new FutureCallback<>();
+herder.fenceZombies(connector, cb, 
InternalRequestSignature.fromHeaders(requestBody, headers));
+completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", 
"PUT", headers, requestBody, forward);
+return Response.ok().build();

Review Comment:
   As far as I can tell the other internal endpoint returns 204 so I'd be in 
favor of doing the same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11473: KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation

2022-06-07 Thread GitBox


mimaison commented on code in PR #11473:
URL: https://github.com/apache/kafka/pull/11473#discussion_r891403316


##
docs/ops.html:
##
@@ -1469,33 +1489,38 @@ 

[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891395016


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -784,6 +845,14 @@ private static Map 
connectorClientConfigOverrides(ConnectorTaskI
 return clientOverrides;
 }
 
+private String transactionalId(ConnectorTaskId id) {

Review Comment:
   Ah whoops, those changes were made on 
https://github.com/apache/kafka/pull/11780, which hasn't been merged yet, so a 
rebase isn't going to automatically draw them in. I'll do the change manually 
here but there may be other small changes in not-yet-merged PRs that don't get 
pulled in here. It should be fine as those changes are included in whichever PR 
gets merged last.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891380187


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -291,7 +328,16 @@ public void start() {
 log.info("Starting KafkaConfigBackingStore");
 // Before startup, callbacks are *not* invoked. You can grab a 
snapshot after starting -- just take care that
 // updates can continue to occur in the background
-configLog.start();
+try {
+configLog.start();
+} catch (UnsupportedVersionException e) {
+throw new ConnectException(
+"Enabling exactly-once support for source connectors 
requires a Kafka broker version that allows "
++ "admin clients to read consumer offsets. Disable 
the worker's exactly-once support "
++ "for source connectors, or use a new Kafka 
broker version.",

Review Comment:
   Haha yep, caught and fixed this in an upstream PR that's since been merged. 
Will pick up in the rebase.
   
   
https://github.com/apache/kafka/blob/a6c5a74fdbdce9a992b47706913c920902cda28c/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L323



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -309,16 +355,70 @@ public void start() {
 @Override
 public void stop() {
 log.info("Closing KafkaConfigBackingStore");
-try {
-configLog.stop();
-} finally {
-if (ownTopicAdmin != null) {
-ownTopicAdmin.close();
-}
+
+if (fencableProducer != null) {
+Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), 
"fencable producer for config topic");
 }
+Utils.closeQuietly(ownTopicAdmin, "admin for config topic");
+Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic");
+
 log.info("Closed KafkaConfigBackingStore");
 }
 
+@Override
+public void claimWritePrivileges() {
+if (usesFencableWriter && fencableProducer == null) {
+try {
+fencableProducer = createFencableProducer();
+fencableProducer.initTransactions();
+} catch (Exception e) {
+if (fencableProducer != null) {
+Utils.closeQuietly(() -> 
fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
+fencableProducer = null;
+}
+throw new ConnectException("Failed to create and initialize 
fencable producer for config topic", e);
+}
+}
+}
+
+private Map baseProducerProps(WorkerConfig workerConfig) {
+Map producerProps = new 
HashMap<>(workerConfig.originals());
+String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(workerConfig);
+producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+ConnectUtils.addMetricsContextProperties(producerProps, workerConfig, 
kafkaClusterId);
+return producerProps;
+}
+
+// Visible for testing
+Map fencableProducerProps(DistributedConfig workerConfig) {
+Map result = new 
HashMap<>(baseProducerProps(workerConfig));
+
+// Always require producer acks to all to ensure durable writes
+result.put(ProducerConfig.ACKS_CONFIG, "all");
+// Don't allow more than one in-flight request to prevent reordering 
on retry (if enabled)
+result.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

Review Comment:
   Yep, this got fixed in https://github.com/apache/kafka/pull/11778, which 
just got merged. A rebase should take care of this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891379394


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##
@@ -280,4 +326,4 @@ public int hashCode() {
 inconsistentConnectors,
 configTransformer);
 }
-}
+}

Review Comment:
   臘 



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##
@@ -280,4 +326,4 @@ public int hashCode() {
 inconsistentConnectors,
 configTransformer);
 }
-}
+}

Review Comment:
   臘 sorry, done.



##
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##
@@ -139,14 +140,23 @@ public void testNoFileOption() throws IOException {
 jaasBasicAuthFilter.filter(requestContext);
 
 verify(requestContext).abortWith(any(Response.class));
-verify(requestContext).getMethod();
+verify(requestContext, atLeastOnce()).getMethod();
 
verify(requestContext).getHeaderString(JaasBasicAuthFilter.AUTHORIZATION);
 }
 
 @Test
-public void testPostWithoutAppropriateCredential() throws IOException {
+public void testInternalTaskConfigEndpointSkipped() throws IOException {
+testInternalEndpointSkipped("connectors/connName/tasks");
+}
+
+@Test
+public void testInternalZombieFencingEndpointSkipped() throws IOException {
+testInternalEndpointSkipped("connectors/connName/fence");
+}
+
+private void testInternalEndpointSkipped(String endpoint) throws 
IOException {
 UriInfo uriInfo = mock(UriInfo.class);
-when(uriInfo.getPath()).thenReturn("connectors/connName/tasks");
+when(uriInfo.getPath()).thenReturn(endpoint);
 
 ContainerRequestContext requestContext = 
mock(ContainerRequestContext.class);
 when(requestContext.getMethod()).thenReturn(HttpMethod.POST);

Review Comment:
    this test was broken and did not catch calls to 
`ContainerRequestContext::abort`. I've updated the test to catch those calls 
and, after it started failing, also updated it to use the correct HTTP method. 
Good catch, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891379608


##
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##
@@ -139,14 +140,23 @@ public void testNoFileOption() throws IOException {
 jaasBasicAuthFilter.filter(requestContext);
 
 verify(requestContext).abortWith(any(Response.class));
-verify(requestContext).getMethod();
+verify(requestContext, atLeastOnce()).getMethod();
 
verify(requestContext).getHeaderString(JaasBasicAuthFilter.AUTHORIZATION);
 }
 
 @Test
-public void testPostWithoutAppropriateCredential() throws IOException {
+public void testInternalTaskConfigEndpointSkipped() throws IOException {
+testInternalEndpointSkipped("connectors/connName/tasks");
+}
+
+@Test
+public void testInternalZombieFencingEndpointSkipped() throws IOException {
+testInternalEndpointSkipped("connectors/connName/fence");
+}
+
+private void testInternalEndpointSkipped(String endpoint) throws 
IOException {
 UriInfo uriInfo = mock(UriInfo.class);
-when(uriInfo.getPath()).thenReturn("connectors/connName/tasks");
+when(uriInfo.getPath()).thenReturn(endpoint);
 
 ContainerRequestContext requestContext = 
mock(ContainerRequestContext.class);
 when(requestContext.getMethod()).thenReturn(HttpMethod.POST);

Review Comment:
    this test was broken and did not catch calls to 
`ContainerRequestContext::abortWith`. I've updated the test to catch those 
calls and, after it started failing, also updated it to use the correct HTTP 
method. Good catch, thanks!



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -784,6 +845,14 @@ private static Map 
connectorClientConfigOverrides(ConnectorTaskI
 return clientOverrides;
 }
 
+private String transactionalId(ConnectorTaskId id) {

Review Comment:
   Yep, exactly  
   Going to try to do the rebase today, but may not be able to finish by EOD as 
it's going to be fairly involved.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -784,6 +845,14 @@ private static Map 
connectorClientConfigOverrides(ConnectorTaskI
 return clientOverrides;
 }
 
+private String transactionalId(ConnectorTaskId id) {
+return transactionalId(config.groupId(), id.connector(), id.task());
+}
+
+public static String transactionalId(String groupId, String connector, int 
taskId) {

Review Comment:
   It's used in integration tests later on: 
https://github.com/C0urante/kafka/blob/3d65e799925096d519b4adf906be05cba70addeb/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L828



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -201,6 +214,9 @@ public static String COMMIT_TASKS_KEY(String connectorName) 
{
 public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
 .field("state", Schema.STRING_SCHEMA)
 .build();
+public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct()
+.field("tasks", Schema.INT32_SCHEMA)

Review Comment:
   I think given the key format ("tasks-count-connector") this is 
probably fine, and the name of the field is also specified in the KIP. But 
similar to the 200 vs. 204 HTTP response for the fencing endpoint, this is 
internal and a small detail, so I can change it if we agree that this kind of 
detail doesn't need to precisely match what's in the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891379125


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##
@@ -138,6 +138,18 @@ public interface Herder {
  */
 void putTaskConfigs(String connName, List> configs, 
Callback callback, InternalRequestSignature requestSignature);
 
+/**
+ * Fence out any older task generations for a source connector, and then 
write a record to the config topic
+ * indicating that it is safe to bring up a new generation of tasks. If 
that record is already present, do nothing
+ * and invoke the callback successfully.
+ * @param connName the name of the connector to fence out, which must 
refer to a source connector; if the
+ * connector does not exist or is not a source connector, 
the callback will be invoked with an error
+ * @param callback callback to invoke upon completion
+ * @param requestSignature the signature of the request made for this 
connector;
+ * may be null if no signature was provided
+ */
+void fenceZombies(String connName, Callback callback, 
InternalRequestSignature requestSignature);

Review Comment:
   Fine by me  



##
gradle/spotbugs-exclude.xml:
##
@@ -311,6 +311,16 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
 
 
+
+

[GitHub] [kafka] mumrah commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

2022-06-07 Thread GitBox


mumrah commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r891371390


##
metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json:
##
@@ -17,14 +17,16 @@
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   I actually think we should not increase the record version unless there is 
an incompatible change. To my knowledge, that is what the record version has 
conveyed historically. However, I don't think there's any _harm_ in increasing 
it, and I also don't think we need to solve it in this PR. Let's go with 
Colin's suggestion here and we can continue this discussion on the mailing list 
or offline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11778: KAFKA-10000: Use transactional producer for config topic (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on PR #11778:
URL: https://github.com/apache/kafka/pull/11778#issuecomment-1148809725

   Thanks Luke, and thanks Tom!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

2022-06-07 Thread GitBox


mumrah commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r891357405


##
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##
@@ -120,10 +121,16 @@ public AclsImage acls() {
 }
 
 public void write(Consumer> out) {
+// We use the minimum KRaft metadata version if this image does
+// not have a specific version set.
+MetadataVersion metadataVersion = features.metadataVersion();
+if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
+metadataVersion = MetadataVersion.IBP_3_0_IV1;
+}

Review Comment:
   Right, once that PR is merged, KRaft will be at metadata version IBP_3_0_IV1 
implicitly until the controller finishes bootstrapping. This will be true on 
the controller and broker side of things



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13908) RuntimeException will be thrown in BrokerServer.startup, not the cause of exception

2022-06-07 Thread Richard Joerger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551141#comment-17551141
 ] 

Richard Joerger commented on KAFKA-13908:
-

Luke, thtatnks so much for the quick insight. I'll head down that path. 

> RuntimeException will be thrown in BrokerServer.startup, not the cause of 
> exception
> ---
>
> Key: KAFKA-13908
> URL: https://issues.apache.org/jira/browse/KAFKA-13908
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Richard Joerger
>Priority: Major
>  Labels: newbie
>
> Before [#11969|https://github.com/apache/kafka/pull/11969], We will throw an 
> {{ExecutionException(KafkaStorageException)}} in 
> {{{}BrokerServer.startup{}}}, and we'll catch the exception and rethrow the 
> cause by:
> {code:java}
> throw if (e.isInstanceOf[ExecutionException]) e.getCause else e{code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/Kafka.scala#L113]
>  
> After [#11969|https://github.com/apache/kafka/pull/11969], We will throw a 
> {{{}RuntimeException(ExecutionException(KafkaStorageException)){}}}. But the 
> catch logic didn't change. That means, if the exception is RuntimeException, 
> we won't throw only the cause, but all the exception chains. 
>  
> We should update it and add tests for it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mimaison commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

2022-06-07 Thread GitBox


mimaison commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r891335380


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -812,13 +817,16 @@ class ReplicaManager(val config: KafkaConfig,
 
 new 
DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
   .setErrorCode(Errors.NONE.code).setTopics(topicInfos)
+  .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
   case None =>
 new 
DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
   .setErrorCode(Errors.NONE.code)
+  .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
 }
 
   } catch {
 case e: KafkaStorageException =>
+  e.printStackTrace()

Review Comment:
   Oops, no we obviously don't want that! Thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #12259: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-07 Thread GitBox


ijuma merged PR #12259:
URL: https://github.com/apache/kafka/pull/12259


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


mimaison commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890986946


##
gradle/spotbugs-exclude.xml:
##
@@ -311,6 +311,16 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
 
 
+
+

[GitHub] [kafka] ijuma commented on a diff in pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc

2022-06-07 Thread GitBox


ijuma commented on code in PR #12261:
URL: https://github.com/apache/kafka/pull/12261#discussion_r891281407


##
docs/upgrade.html:
##
@@ -213,6 +213,9 @@ Notable changes in 3
 
 Notable changes in 
3.0.0
 
+Java 8 and Scala 2.12 support have been deprecated since Apache Kafka 
3.0 and will be removed in Apache Kafka 4.0.
+See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223;>KIP-750
+and https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218;>KIP-751
 for more details.

Review Comment:
   Maybe move this to the second line since it's less impactful than the 
producer idempotence change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc

2022-06-07 Thread GitBox


ijuma commented on code in PR #12261:
URL: https://github.com/apache/kafka/pull/12261#discussion_r891279768


##
docs/ops.html:
##
@@ -1266,7 +1266,8 @@ 6.6 Java Version
 
-  Java 8 and Java 11 are supported. Java 11 performs significantly better if 
TLS is enabled, so it is highly recommended (it also includes a number of other
+  Java 8, Java 11, and Java 17 are supported. Note that Java 8 support has 
been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0.
+  Java 11 and later versions performs significantly better if TLS is enabled, 
so it is highly recommended (it also includes a number of other

Review Comment:
   "Java 11 and later versions perform significantly better if TLS is enabled, 
so they are highly recommended (they also includes a number of other"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-07 Thread GitBox


cadonna merged PR #12235:
URL: https://github.com/apache/kafka/pull/12235


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-07 Thread GitBox


cadonna commented on PR #12235:
URL: https://github.com/apache/kafka/pull/12235#issuecomment-1148710618

   The test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-07 Thread GitBox


cadonna commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r891064878


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java:
##
@@ -166,4 +171,46 @@ public static String extractThreadId(final String 
fullThreadName) {
 final int index = fullThreadName.indexOf("StreamThread-");
 return fullThreadName.substring(index);
 }
+
+public static long producerRecordSizeInBytes(final ProducerRecord record) {
+return recordSizeInBytes(
+record.key().length,
+record.value() == null ? 0 : record.value().length,
+record.topic(),
+record.headers()
+);
+}
+
+public static long consumerRecordSizeInBytes(final ConsumerRecord record) {
+return recordSizeInBytes(
+record.serializedKeySize(),
+record.serializedValueSize(),
+record.topic(),
+record.headers()
+);
+}
+
+public static long recordSizeInBytes(final long keyBytes,

Review Comment:
   nit: Could you make this private? 



##
streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java:
##
@@ -46,13 +47,15 @@ public  void send(final String topic,
 final Integer partition,
 final Long timestamp,
 final Serializer keySerializer,
-final Serializer valueSerializer) {
+final Serializer valueSerializer,
+final String processorNodeId,
+final InternalProcessorContext 
context) {
 collected.add(new ProducerRecord<>(topic,
-partition,
-timestamp,
-key,
-value,
-headers));
+   partition,
+   timestamp,
+   key,
+   value,
+   headers));

Review Comment:
   nit: The indentation was actually right since it is a method call and not a 
method declaration.
   



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##
@@ -192,6 +217,28 @@ public  void send(final String topic,
 } else {
 log.warn("Received offset={} in produce response for {}", 
metadata.offset(), tp);
 }
+
+if (!topic.endsWith("-changelog")) {
+// we may not have created a sensor yet if the node uses 
dynamic topic routing

Review Comment:
   This comment is a bit misleading here. AFAIU it refers to the `else`-branch. 
Please move it or remove it. I think you know my preference .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13335) Upgrading connect from 2.7.0 to 2.8.0 causes worker instability

2022-06-07 Thread John Gray (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Gray resolved KAFKA-13335.
---
Resolution: Not A Problem

Finally got back to this after a long time. This is no bug or fault of Kafka 
Connect. We have a lot of connectors, so it takes a while to rebalance all of 
them. We were simply constantly hitting the rebalance.timeout.ms, leaving us in 
an endless loop of rebalancing. Not sure what changed between 2.7.0 and 2.8.0 
to enforce this timeout or to lengthen the time to rebalance, but something 
did. Bumped the timeout to 3 minutes from 1 minute and we are good to go! 

> Upgrading connect from 2.7.0 to 2.8.0 causes worker instability
> ---
>
> Key: KAFKA-13335
> URL: https://issues.apache.org/jira/browse/KAFKA-13335
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.8.0
>Reporter: John Gray
>Priority: Major
> Attachments: image-2021-09-29-09-15-18-172.png
>
>
> After recently upgrading our connect cluster to 2.8.0 (via 
> strimzi+Kubernetes, brokers are still on 2.7.0), I am noticing that the 
> cluster is struggling to stabilize. Connectors are being 
> unassigned/reassigned/duplicated continuously, and never settling back down. 
> A downgrade back to 2.7.0 fixes things immediately. I have attached a picture 
> of our Grafana dashboards showing some metrics. We have a connect cluster 
> with 4 nodes, trying to maintain about 1000 connectors, each connector with a 
> maxTask of 1. 
> We are noticing a slow increase in memory usage with big random peaks of 
> tasks counts and thread counts.
> I do also notice over the course of letting 2.8.0 run a huge increase in logs 
> stating that {code}ERROR Graceful stop of task (task name here) 
> failed.{code}, but the logs do not seem to indicate a reason. The connector 
> appears to be stopped only seconds after its creation. It appears to only 
> affect our source connectors. These logs stop after downgrading back to 2.7.0.
> I am also seeing an increase in logs stating that {code}Couldn't instantiate 
> task (task name) because it has an invalid task configuration. This task will 
> not execute until reconfigured. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [StartAndStopExecutor-connect-1-1]
> org.apache.kafka.connect.errors.ConnectException: Task already exists in this 
> worker: (task name)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:512)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:834){code}
> I am not sure what could be causing this, any insight would be appreciated! 
> I do notice Kafka 2.7.1/2.8.0 contains a bugfix related to connect rebalances 
> (KAFKA-10413). Is that fix potentially causing instability? 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] tombentley commented on pull request #12196: Implement SCRAM support in KRaft

2022-06-07 Thread GitBox


tombentley commented on PR #12196:
URL: https://github.com/apache/kafka/pull/12196#issuecomment-1148690209

   @cmccabe is there a JIRA for this? Are you intending to implement support 
for `DescribeUserScramCredentials` in this PR too, or will that be done in a 
later PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12260: MINOR: add note on IDEMPOTENT_WRITE ACL to 3.2.0 notable changes

2022-06-07 Thread GitBox


ijuma commented on code in PR #12260:
URL: https://github.com/apache/kafka/pull/12260#discussion_r891237342


##
docs/upgrade.html:
##
@@ -67,6 +67,9 @@ Notable changes in 3
 which meant that idempotence remained disabled unless the user had 
explicitly set enable.idempotence to true
 (See https://issues.apache.org/jira/browse/KAFKA-13598;>KAFKA-13598for 
more details).
 This issue was fixed and the default is properly applied in 3.0.1, 
3.1.1, and 3.2.0.
+
+When the broker version is lower than 2.8.0, and the client 
version is 3.0.1, 3.1.1, and later, the IDEMPOTENT_WRITE permission is required 
to produce data

Review Comment:
   I would say something like: Please read the relevant KIP section for the 
compatibility implications - a noteworthy item worth highlighting is...
   
   Also, we should use the same approach for this and other lines. It's unclear 
to me why we added a `ul` here, but not in the other case. I'd probably include 
this paragraph in the previous 



##
docs/upgrade.html:
##
@@ -67,6 +67,9 @@ Notable changes in 3
 which meant that idempotence remained disabled unless the user had 
explicitly set enable.idempotence to true
 (See https://issues.apache.org/jira/browse/KAFKA-13598;>KAFKA-13598for 
more details).
 This issue was fixed and the default is properly applied in 3.0.1, 
3.1.1, and 3.2.0.
+
+When the broker version is lower than 2.8.0, and the client 
version is 3.0.1, 3.1.1, and later, the IDEMPOTENT_WRITE permission is required 
to produce data

Review Comment:
   I would say something like: Please read the relevant KIP section for the 
compatibility implications - a noteworthy item worth highlighting is...
   
   Also, we should use the same approach for this and other lines. It's unclear 
to me why we added a `ul` here, but not in the other case. I'd probably include 
this paragraph in the previous `li`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign

2022-06-07 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551082#comment-17551082
 ] 

Matthew de Detrich edited comment on KAFKA-8420 at 6/7/22 1:20 PM:
---

So in order to work on this issue I tried making a test to replicate what you 
are describing and I came across some interesting, the test that I wrote looks 
like this
{code:java}
@Test
public void gracefulHandlingSwitchSubscribeToManualAssign() {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

KafkaConsumer consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);

consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, singleton(topic), assignor, 
singletonList(tp0), null);


ConsumerRecords initialConsumerRecords = 
consumer.poll(Duration.ofMillis(0));
assertTrue(initialConsumerRecords.isEmpty());

consumer.unsubscribe();

consumer.assign(singleton(tp0));
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), 
Errors.NONE), coordinator);
consumer.poll(Duration.ofSeconds(1));
} {code}
The problem that I am currently getting is that the 
{{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock 
(note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however 
this caused the {{consumer.poll}} method to short circuit due to 
{{timer.notExpired()}} never executing and hence just immediately returning an 
{{ConsumerRecords.empty();}} without the consumer ever sending a request to 
trigger a sync-group resonse).

After spending some time debugging this is the piece of code that is not 
terminating 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251].
 What I am finding highly confusing if the fact that the 
{{lookupCoordinator()}} does actually complete (in this case it immediately 
returns {{findCoordinatorFuture}} at 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294])
 however for some reason the loop at 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215]
 never terminates. It doesn't appear to detect that the future has finished 
which I believe to be the case? I am not sure if this is related to what you 
mentioned, i.e. 
{quote}In the worst case (i.e. leader keep sending incompatible assignment), 
this would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up 
the wrong tree? Do you have any insights into this [~guozhang] 


was (Author: mdedetrich-aiven):
So in order to work on this issue I tried making a test to replicate what you 
are describing and I came across some interesting, the test that I wrote looks 
like this


{code:java}
@Test
public void gracefulHandlingSwitchSubscribeToManualAssign() {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

KafkaConsumer consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);

consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, singleton(topic), assignor, 
singletonList(tp0), null);


ConsumerRecords initialConsumerRecords = 
consumer.poll(Duration.ofMillis(0));
assertTrue(initialConsumerRecords.isEmpty());

consumer.unsubscribe();

consumer.assign(singleton(tp0));
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), 
Errors.NONE), coordinator);
consumer.poll(Duration.ofSeconds(1));
} {code}
The problem that I am currently getting is that the 
{{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock 
(note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however 
this caused the {{consumer.poll}} method to short circuit due to 
{{timer.notExpired()}} never executing and hence just immediately returning an 
{{ConsumerRecords.empty();}} without the consumer ever sending a request to 
trigger a sync-group resonse).

After spending some time debugging this is the piece of code that is not 
terminating 

[jira] [Commented] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign

2022-06-07 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551082#comment-17551082
 ] 

Matthew de Detrich commented on KAFKA-8420:
---

So in order to work on this issue I tried making a test to replicate what you 
are describing and I came across some interesting, the test that I wrote looks 
like this


{code:java}
@Test
public void gracefulHandlingSwitchSubscribeToManualAssign() {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

KafkaConsumer consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);

consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, singleton(topic), assignor, 
singletonList(tp0), null);


ConsumerRecords initialConsumerRecords = 
consumer.poll(Duration.ofMillis(0));
assertTrue(initialConsumerRecords.isEmpty());

consumer.unsubscribe();

consumer.assign(singleton(tp0));
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), 
Errors.NONE), coordinator);
consumer.poll(Duration.ofSeconds(1));
} {code}
The problem that I am currently getting is that the 
{{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock 
(note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however 
this caused the {{consumer.poll}} method to short circuit due to 
{{timer.notExpired()}} never executing and hence just immediately returning an 
{{ConsumerRecords.empty();}} without the consumer ever sending a request to 
trigger a sync-group resonse).

After spending some time debugging this is the piece of code that is not 
terminating 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251].
 What I am finding highly confusing if the fact that the 
{{lookupCoordinator()}} does actually complete (in this case it immediately 
returns {{findCoordinatorFuture}} at 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294])
 however for some reason the loop at 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215]
 never terminates. It doesn't appear to detect that the future has finished 
which I believe to be the case? I am not sure if this is related to what you 
mentioned, i.e. 
{quote}
In the worst case (i.e. leader keep sending incompatible assignment), this 
would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up 
the wrong tree?

> Graceful handling when consumer switches from subscribe to manual assign
> 
>
> Key: KAFKA-8420
> URL: https://issues.apache.org/jira/browse/KAFKA-8420
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Assignee: Matthew de Detrich
>Priority: Major
>
> Today if a consumer switches between subscribe (and hence relies on group 
> rebalance to get assignment) and manual assign, it may cause unnecessary 
> rebalances. For example:
> 1. consumer.subscribe();
> 2. consumer.poll(); // join-group request sent, returns empty because 
> poll timeout
> 3. consumer.unsubscribe();
> 4. consumer.assign(..);
> 5. consumer.poll(); // sync-group request received, and the assigned 
> partitions does not match the current subscription-state. In this case it 
> will tries to re-join which is not necessary.
> In the worst case (i.e. leader keep sending incompatible assignment), this 
> would case the consumer to fall into endless re-joins.
> Although it is not a very common usage scenario, it still worth being better 
> handled than the status-quo.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Tomasz Kaszuba (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Kaszuba updated KAFKA-13963:
---
Component/s: streams

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Tomasz Kaszuba (Jira)
Tomasz Kaszuba created KAFKA-13963:
--

 Summary: Topology Description ignores context.forward
 Key: KAFKA-13963
 URL: https://issues.apache.org/jira/browse/KAFKA-13963
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.2
Reporter: Tomasz Kaszuba


I have a simple topology:
{code:java}
      val topology = new Topology
      topology
        .addSource("source", Serdes.stringSerde.deserializer, 
Serdes.stringSerde.deserializer, inputTopic)
        .addProcessor(
          "process",
          new ProcessorSupplier[String, String] {
            override def get(): Processor[String, String] =
              new RecordCollectorProcessor()
          },
          "source"
        ) {code}
And a simple processor that uses context.forward to forward messages:
{code:java}
  private class ContextForwardProcessor extends AbstractProcessor[String, 
String]() {    override def process(key: String, value: String): Unit =
      context().forward("key", "value", To.child("output"))    override def 
close(): Unit = ()
  }  {code}
when I call topology.describe() I receive this:
{noformat}
Topologies:
   Sub-topology: 0
    Source: source (topics: [input])
      --> process
    Processor: process (stores: [])
      --> none
      <-- source {noformat}
Ignoring the fact that this will not run since it will throw a runtime 
exception why is the To.child ignored?

Taking it one point further if I add multiple sinks to the topology like so:
{code:java}
val topology = new Topology
      topology
        .addSource("source", Serdes.stringSerde.deserializer, 
Serdes.stringSerde.deserializer, inputTopic)
        .addProcessor(
          "process",
          new ProcessorSupplier[String, String] {
            override def get(): Processor[String, String] =
              new ContextForwardProcessor()
          },
          "source"
        )
        .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
Serdes.stringSerde.serializer(), "process")
        .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
Serdes.stringSerde.serializer(), "process")  {code}
but have the processor only output to "output1" it is in no way reflected in 
the described topology graph.

I assume this is by design since it's a lot more work to interpret what the 
context.forward is doing but when I tried to look for this information in the 
java doc I couldn't find it.

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13962) KRaft StripedReplicaPlacer should handle replicas in controlled shutdown

2022-06-07 Thread David Jacot (Jira)
David Jacot created KAFKA-13962:
---

 Summary: KRaft StripedReplicaPlacer should handle replicas in 
controlled shutdown
 Key: KAFKA-13962
 URL: https://issues.apache.org/jira/browse/KAFKA-13962
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


[KIP-841|https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft]
 added the in-controlled-shutdown state to the quorum controller. The 
StripedReplicaPlacer should be aware of them and treat them like fenced 
replicas (place there only as last resort).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

2022-06-07 Thread GitBox


dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r891098455


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic,
 short replicationFactor = topic.replicationFactor() == -1 ?
 defaultReplicationFactor : topic.replicationFactor();
 try {
-List> replicas = 
clusterControl.replicaPlacer().place(new PlacementSpec(
+List> partitions = 
clusterControl.replicaPlacer().place(new PlacementSpec(
 0,
 numPartitions,
 replicationFactor
 ), clusterDescriber);
-for (int partitionId = 0; partitionId < replicas.size(); 
partitionId++) {
-int[] r = Replicas.toArray(replicas.get(partitionId));
+for (int partitionId = 0; partitionId < partitions.size(); 
partitionId++) {
+List replicas = partitions.get(partitionId);
+List isr = replicas.stream().
+
filter(clusterControl::active).collect(Collectors.toList());
+// We need to have at least one replica in the ISR.
+if (isr.isEmpty()) isr.add(replicas.get(0));

Review Comment:
   Sure. https://issues.apache.org/jira/browse/KAFKA-13962. I will do this 
right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13959) Controller should unfence Broker with busy metadata log

2022-06-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550943#comment-17550943
 ] 

Luke Chen commented on KAFKA-13959:
---

Sorry, I didn't see your last sentence. Thanks for the investigation! Looking 
forward to knowing the root cause! :)

> Controller should unfence Broker with busy metadata log
> ---
>
> Key: KAFKA-13959
> URL: https://issues.apache.org/jira/browse/KAFKA-13959
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
>
> https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible 
> for the controller to not unfence a broker if the committed offset keeps 
> increasing.
>  
> One solution to this problem is to require the broker to only catch up to the 
> last committed offset when they last sent the heartbeat. For example:
>  # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit 
> offset is {{{}X{}}}. The controller remember this last commit offset, call it 
> {{X'}}
>  # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence 
> the broker if {{Z >= X}} or {{{}Z >= X'{}}}.
>  
> This change should also set the default for MetadataMaxIdleIntervalMs back to 
> 500.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13959) Controller should unfence Broker with busy metadata log

2022-06-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550941#comment-17550941
 ] 

Luke Chen commented on KAFKA-13959:
---

[~dengziming] , if it's 10 ms heartbeat, how could it not be able to catch up 
with 500ms no-op records?

> Controller should unfence Broker with busy metadata log
> ---
>
> Key: KAFKA-13959
> URL: https://issues.apache.org/jira/browse/KAFKA-13959
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
>
> https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible 
> for the controller to not unfence a broker if the committed offset keeps 
> increasing.
>  
> One solution to this problem is to require the broker to only catch up to the 
> last committed offset when they last sent the heartbeat. For example:
>  # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit 
> offset is {{{}X{}}}. The controller remember this last commit offset, call it 
> {{X'}}
>  # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence 
> the broker if {{Z >= X}} or {{{}Z >= X'{}}}.
>  
> This change should also set the default for MetadataMaxIdleIntervalMs back to 
> 500.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] d-t-w commented on pull request #12260: MINOR: add note on IDEMPOTENT_WRITE ACL to 3.2.0 notable changes

2022-06-07 Thread GitBox


d-t-w commented on PR #12260:
URL: https://github.com/apache/kafka/pull/12260#issuecomment-1148497056

   Thanks @showuon I have updated per your comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-07 Thread GitBox


cadonna commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r891054931


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java:
##
@@ -114,6 +116,63 @@ public void shouldGetProcessAtSourceSensor() {
 verifySensor(() -> 
ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, streamsMetrics));
 }
 
+@Test
+public void shouldGetRecordsAndBytesConsumedSensor() {
+final String recordsMetricNamePrefix = "records-consumed";
+final String bytesMetricNamePrefix = "bytes-consumed";
+final String descriptionOfRecordsTotal = "The total number of records 
consumed from this topic";
+final String descriptionOfBytesTotal = "The total number of bytes 
consumed from this topic";
+when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, recordsMetricNamePrefix, RecordingLevel.INFO))
+.thenReturn(expectedSensor);
+when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, bytesMetricNamePrefix, RecordingLevel.INFO))
+.thenReturn(expectedSensor);
+when(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID)).thenReturn(tagMap);
+
+final Map consumedTagMap = new HashMap<>(tagMap);
+consumedTagMap.put("topic", TOPIC_NAME);
+StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(

Review Comment:
   Oh no! I just prototyped that code since it didn't make sense how the code 
was before with Mockito. I did also not know about closing static mocks in 
Mockito. I am really sorry!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

2022-06-07 Thread GitBox


soarez commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r891031695


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -812,13 +817,16 @@ class ReplicaManager(val config: KafkaConfig,
 
 new 
DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
   .setErrorCode(Errors.NONE.code).setTopics(topicInfos)
+  .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
   case None =>
 new 
DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
   .setErrorCode(Errors.NONE.code)
+  .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
 }
 
   } catch {
 case e: KafkaStorageException =>
+  e.printStackTrace()

Review Comment:
   Is this change intended? The exception is already logged in the following 
line .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13959) Controller should unfence Broker with busy metadata log

2022-06-07 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550912#comment-17550912
 ] 

dengziming commented on KAFKA-13959:


When BrokerLifecycleManager is starting up, it will send heartbeat every 10 
milliseconds rather than 2000 milliseconds: 

`scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))`

which is already smaller than 500ms, so the reason for this bug is more 
complex, I need more time to investigate.

> Controller should unfence Broker with busy metadata log
> ---
>
> Key: KAFKA-13959
> URL: https://issues.apache.org/jira/browse/KAFKA-13959
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
>
> https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible 
> for the controller to not unfence a broker if the committed offset keeps 
> increasing.
>  
> One solution to this problem is to require the broker to only catch up to the 
> last committed offset when they last sent the heartbeat. For example:
>  # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit 
> offset is {{{}X{}}}. The controller remember this last commit offset, call it 
> {{X'}}
>  # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence 
> the broker if {{Z >= X}} or {{{}Z >= X'{}}}.
>  
> This change should also set the default for MetadataMaxIdleIntervalMs back to 
> 500.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890986175


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4337,25 +4337,22 @@ public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuoru
 final Call call = new Call(
 "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
 
-private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
-Integer partition = 0;
-String topicName = response.getTopicNameByIndex(0);
-Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
 List voters = new ArrayList<>();
 List observers = new ArrayList<>();
-response.getVoterInfo(topicName, partition).forEach(v -> {
+partition.currentVoters().forEach(v -> {
 voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
 v.logEndOffset(),
 OptionalLong.of(v.lastFetchTimestamp()),
 OptionalLong.of(v.lastCaughtUpTimestamp(;

Review Comment:
   When `lastFetchTimestamp` or `lastCaughtUpTimestamp` are not provided 
(equals to -1), don't we want to return an empty option instead of returning an 
option containing -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-07 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890984307


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
 }
 }
 
+@Test
+public void testDescribeMetadataQuorumSuccess() throws Exception {
+try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   Ack. I was not aware of this bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

2022-06-07 Thread GitBox


dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890963791


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -485,12 +541,36 @@ Iterator usableBrokers() {
 id -> brokerRegistrations.get(id).rack());
 }
 
+/**
+ * Returns true if the broker is in fenced state; Returns false if it is
+ * not or if it does not exist.
+ */
 public boolean unfenced(int brokerId) {

Review Comment:
   That's right. However, it is used in many places in the tests. I haven't 
found a good way to replace it in tests that is as convenient as this 
predicate. I would keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #12180: KAFKA-13917: Avoid calling lookupCoordinator() in tight loop

2022-06-07 Thread GitBox


viktorsomogyi commented on PR #12180:
URL: https://github.com/apache/kafka/pull/12180#issuecomment-1148387148

   @hachikuji would you please review this small PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

2022-06-07 Thread GitBox


dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890922496


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -132,15 +143,27 @@ ClusterControlManager build() {
 replicaPlacer = new StripedReplicaPlacer(new Random());
 }
 if (controllerMetrics == null) {
-throw new RuntimeException("You must specify 
controllerMetrics");
+throw new RuntimeException("You must specify 
ControllerMetrics");
+}
+if (featureControl == null) {
+featureControl = new FeatureControlManager.Builder().
+setLogContext(logContext).
+setSnapshotRegistry(snapshotRegistry).
+setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+QuorumFeatures.defaultFeatureMap(),
+singletonList(0))).
+setMetadataVersion(MetadataVersion.latest()).
+build();

Review Comment:
   Yeah, I agree. Let me change that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc

2022-06-07 Thread GitBox


showuon commented on PR #12261:
URL: https://github.com/apache/kafka/pull/12261#issuecomment-1148327640

   @ijuma , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #12261: MINOR: add java 8/scala 2.12 deprecation info in doc

2022-06-07 Thread GitBox


showuon opened a new pull request, #12261:
URL: https://github.com/apache/kafka/pull/12261

   We've deprecated java 8 and scala 2.12 in KIP-750 and KIP-751 since v3.0. We 
should add a note in notable changes in v3.0. And also update the `Java 
version` in doc.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

2022-06-07 Thread GitBox


dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890873379


##
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##
@@ -213,12 +230,30 @@ public String toString() {
 bld.append("}");
 bld.append(", rack=").append(rack);
 bld.append(", fenced=").append(fenced);
+bld.append(", inControlledShutdown=").append(inControlledShutdown);
 bld.append(")");
 return bld.toString();
 }
 
-public BrokerRegistration cloneWithFencing(boolean fencing) {
-return new BrokerRegistration(id, epoch, incarnationId, listeners,
-supportedFeatures, rack, fencing);
+public Optional maybeCloneWith(
+Optional fencingChange,
+Optional inControlledShutdownChange
+) {
+boolean newFenced = fencingChange.orElse(fenced);
+boolean newInControlledShutdownChange = 
inControlledShutdownChange.orElse(inControlledShutdown);
+
+if (newFenced == fenced && newInControlledShutdownChange == 
inControlledShutdown)
+return Optional.empty();

Review Comment:
   Yeah, you're right. I am not sure why I use this Optional here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

2022-06-07 Thread GitBox


dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890868519


##
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##
@@ -120,10 +121,16 @@ public AclsImage acls() {
 }
 
 public void write(Consumer> out) {
+// We use the minimum KRaft metadata version if this image does
+// not have a specific version set.
+MetadataVersion metadataVersion = features.metadataVersion();
+if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
+metadataVersion = MetadataVersion.IBP_3_0_IV1;
+}

Review Comment:
   @mumrah is removing `UNINITIALIZED` in 
https://github.com/apache/kafka/pull/12250. We can remove this logic afterwards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #11778: KAFKA-10000: Use transactional producer for config topic (KIP-618)

2022-06-07 Thread GitBox


showuon merged PR #11778:
URL: https://github.com/apache/kafka/pull/11778


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11778: KAFKA-10000: Use transactional producer for config topic (KIP-618)

2022-06-07 Thread GitBox


showuon commented on PR #11778:
URL: https://github.com/apache/kafka/pull/11778#issuecomment-1148284973

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.api.ConsumerBounceTest.testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on PR #11779:
URL: https://github.com/apache/kafka/pull/11779#issuecomment-1148257156

   Thanks Tom, some great catches. Going to rebase tomorrow or Thursday which 
should address the one or two outstanding comments; everything else should be 
addressed now and ready for another round.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817727


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection 
connectors, Collection callback) {
+if (internalRequestValidationEnabled()) {
+ConnectRestException requestValidationError = null;
+if (requestSignature == null) {
+requestValidationError = new BadRequestException("Internal 
request missing required signature");
+} else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+requestValidationError = new BadRequestException(String.format(
+"This worker does not support the '%s' key signing 
algorithm used by other workers. "
++ "This worker is currently configured to use: 
%s. "
++ "Check that all workers' configuration files 
permit the same set of signature algorithms, "
++ "and correct any misconfigured worker and 
restart it.",
+requestSignature.keyAlgorithm(),
+keySignatureVerificationAlgorithms
+));
+} else {
+if (!requestSignature.isValid(sessionKey)) {
+requestValidationError = new ConnectRestException(
+Response.Status.FORBIDDEN,
+"Internal request contained invalid signature."
+);
+}
+}
+if (requestValidationError != null) {
+callback.onCompletion(requestValidationError, null);
+return true;
+}
+}
+
+return false;
+}
+
+/**
+ * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+ * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+ * record to the config topic.
+ */
+class ZombieFencing {
+private final String connName;
+private final int tasksToRecord;
+private final int taskGen;
+private final FutureCallback fencingFollowup;
+private final KafkaFuture fencingFuture;
+
+public ZombieFencing(String connName, int tasksToFence, int 
tasksToRecord, int taskGen) {
+this.connName = connName;
+this.tasksToRecord = tasksToRecord;
+this.taskGen = taskGen;
+this.fencingFollowup = new FutureCallback<>();
+this.fencingFuture = worker.fenceZombies(connName, tasksToFence, 
configState.connectorConfig(connName)).thenApply(ignored -> {
+// This callback will be called on the same thread that 
invokes KafkaFuture::thenApply if
+// the future is already completed. Since that thread is the 
herder tick thread, we don't need
+// to perform follow-up logic through an additional herder 
request (and if we tried, it would lead
+// to deadlock)
+addOrRunRequest(
+this::onZombieFencingSuccess,
+fencingFollowup
+);
+awaitFollowup();
+return null;
+});
+}
+
+// Invoked after the worker has successfully fenced out the producers 
of old task generations using an admin client
+// Note that work here will be performed on the herder's tick thread, 
so it should not block for very long
+private Void onZombieFencingSuccess() throws TimeoutException {
+configBackingStore.refresh(1, TimeUnit.MINUTES);
+configState = configBackingStore.snapshot();
+if (taskGen < configState.taskConfigGeneration(connName)) {
+throw new ConnectRestException(
+Response.Status.CONFLICT.getStatusCode(),
+"Fencing failed because new task configurations were 
generated for the connector");
+}
+if (!writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskCountRecord(connName, tasksToRecord))) {

Review Comment:
   Good point, replaced `configLog.readToEnd().get()` with 
`configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS)`, 
which is used everywhere else in the `KafkaConfigBackingStore` where we read to 
the end of the log to ensure that writes that we just performed have landed. It 
comes with the downside that it makes zombie fencing rounds more frail, but 
that's better than squatting indefinitely on the herder thread.
   
   I also fixed another potential blocking issue around this area by shifting 
the call to `onZombieFencingSuccess` (or rather, the registration of it as a 
follow-up to the future returned by `Worker::fenceZombies`) 

[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817572


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection 
connectors, Collection callback) {
+if (internalRequestValidationEnabled()) {
+ConnectRestException requestValidationError = null;
+if (requestSignature == null) {
+requestValidationError = new BadRequestException("Internal 
request missing required signature");
+} else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+requestValidationError = new BadRequestException(String.format(
+"This worker does not support the '%s' key signing 
algorithm used by other workers. "
++ "This worker is currently configured to use: 
%s. "
++ "Check that all workers' configuration files 
permit the same set of signature algorithms, "
++ "and correct any misconfigured worker and 
restart it.",
+requestSignature.keyAlgorithm(),
+keySignatureVerificationAlgorithms
+));
+} else {
+if (!requestSignature.isValid(sessionKey)) {
+requestValidationError = new ConnectRestException(
+Response.Status.FORBIDDEN,
+"Internal request contained invalid signature."
+);
+}
+}
+if (requestValidationError != null) {
+callback.onCompletion(requestValidationError, null);
+return true;
+}
+}
+
+return false;
+}
+
+/**
+ * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+ * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+ * record to the config topic.
+ */
+class ZombieFencing {
+private final String connName;
+private final int tasksToRecord;
+private final int taskGen;
+private final FutureCallback fencingFollowup;
+private final KafkaFuture fencingFuture;
+
+public ZombieFencing(String connName, int tasksToFence, int 
tasksToRecord, int taskGen) {
+this.connName = connName;
+this.tasksToRecord = tasksToRecord;
+this.taskGen = taskGen;
+this.fencingFollowup = new FutureCallback<>();
+this.fencingFuture = worker.fenceZombies(connName, tasksToFence, 
configState.connectorConfig(connName)).thenApply(ignored -> {
+// This callback will be called on the same thread that 
invokes KafkaFuture::thenApply if
+// the future is already completed. Since that thread is the 
herder tick thread, we don't need
+// to perform follow-up logic through an additional herder 
request (and if we tried, it would lead
+// to deadlock)
+addOrRunRequest(
+this::onZombieFencingSuccess,
+fencingFollowup
+);
+awaitFollowup();
+return null;
+});
+}
+
+// Invoked after the worker has successfully fenced out the producers 
of old task generations using an admin client
+// Note that work here will be performed on the herder's tick thread, 
so it should not block for very long
+private Void onZombieFencingSuccess() throws TimeoutException {
+configBackingStore.refresh(1, TimeUnit.MINUTES);
+configState = configBackingStore.snapshot();
+if (taskGen < configState.taskConfigGeneration(connName)) {
+throw new ConnectRestException(
+Response.Status.CONFLICT.getStatusCode(),
+"Fencing failed because new task configurations were 
generated for the connector");
+}
+if (!writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskCountRecord(connName, tasksToRecord))) {
+throw new ConnectException("Failed to write connector task 
count record to config topic since worker was fenced out");
+}

Review Comment:
   I pushed a change to https://github.com/apache/kafka/pull/11778 that 
basically does this; will rebase and update the new config topic writes 
introduced in this PR accordingly. One noteworthy difference now is that the 
exception message is always the same regardless of which operation failed; I 
tried to make it generic and user-friendly enough to work with that, but if 
that doesn't work well enough, we can add a message parameter to this method 
and use it as part of the message for the exception 

[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817431


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1745,6 +1958,23 @@ private boolean checkRebalanceNeeded(Callback 
callback) {
 return false;
 }
 
+/**
+ * Execute the given action and subsequent callback immediately if the 
current thread is the herder's tick thread,
+ * or use them to create and store a {@link DistributedHerderRequest} on 
the request queue and return the resulting request
+ * if not.
+ * @param action the action that should be run on the herder's tick thread
+ * @param callback the callback that should be invoked once the action is 
complete
+ * @return a new {@link DistributedHerderRequest} if one has been created 
and added to the request queue, and {@code null} otherwise
+ */
+DistributedHerderRequest addOrRunRequest(Callable action, 
Callback callback) {

Review Comment:
   That works, yeah 



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection 
connectors, Collection callback) {
+if (internalRequestValidationEnabled()) {
+ConnectRestException requestValidationError = null;
+if (requestSignature == null) {
+requestValidationError = new BadRequestException("Internal 
request missing required signature");
+} else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+requestValidationError = new BadRequestException(String.format(
+"This worker does not support the '%s' key signing 
algorithm used by other workers. "
++ "This worker is currently configured to use: 
%s. "
++ "Check that all workers' configuration files 
permit the same set of signature algorithms, "
++ "and correct any misconfigured worker and 
restart it.",
+requestSignature.keyAlgorithm(),
+keySignatureVerificationAlgorithms
+));
+} else {
+if (!requestSignature.isValid(sessionKey)) {
+requestValidationError = new ConnectRestException(
+Response.Status.FORBIDDEN,
+"Internal request contained invalid signature."
+);
+}
+}
+if (requestValidationError != null) {
+callback.onCompletion(requestValidationError, null);

Review Comment:
   This follows the same pattern as 
[AbstractHerder::maybeAddConfigErrors](https://github.com/apache/kafka/blob/09570f2540269cc1196c4c69cc7997d035159d1d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L711),
 which accepts a `Callback` but only invokes it on errors. This is useful if 
you'd like to establish some reusable logic that terminates control flow for a 
method and reports an error to a callback if something goes wrong, but 
otherwise allows control flow to continue and possibly fail later.
   
   I'll take a page out of `AbstractHerder::maybeAddConfigErrors`'s book and 
add Javadocs making note of this fact. 



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection 
connectors, Collection callback) {
+if (internalRequestValidationEnabled()) {
+ConnectRestException requestValidationError = null;
+if (requestSignature == null) {
+requestValidationError = new BadRequestException("Internal 
request missing required signature");
+} else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+requestValidationError = new BadRequestException(String.format(
+"This worker does not support the '%s' key signing 
algorithm used by other workers. "
++ "This worker is currently configured to use: 
%s. "
++ "Check that all workers' configuration files 
permit the same set of signature algorithms, "
++ "and correct any misconfigured worker and 
restart it.",
+requestSignature.keyAlgorithm(),
+keySignatureVerificationAlgorithms
+));
+} else {
+if (!requestSignature.isValid(sessionKey)) {
+requestValidationError = new ConnectRestException(
+Response.Status.FORBIDDEN,
+"Internal request contained invalid 

[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-07 Thread GitBox


C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817292


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -582,6 +588,59 @@ public boolean startTask(
 }
 }
 
+/**
+ * Using the admin principal for this connector, perform a round of zombie 
fencing that disables transactional producers
+ * for the specified number of source tasks from sending any more records.
+ * @param connName the name of the connector
+ * @param numTasks the number of tasks to fence out
+ * @param connProps the configuration of the connector; may not be null
+ * @return a {@link KafkaFuture} that will complete when the producers 
have all been fenced out, or the attempt has failed
+ */
+public KafkaFuture fenceZombies(String connName, int numTasks, 
Map connProps) {
+return fenceZombies(connName, numTasks, connProps, Admin::create);
+}
+
+// Allows us to mock out the Admin client for testing
+KafkaFuture fenceZombies(String connName, int numTasks, Map connProps, Function, Admin> adminFactory) {
+log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
+try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
+ClassLoader savedLoader = plugins.currentThreadLoader();
+try {
+String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.delegatingLoader().connectorLoader(connType);
+savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+final SourceConnectorConfig connConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+final Class connClass = 
plugins.connectorClass(
+
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+connConfig,
+connClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SOURCE);
+Admin admin = adminFactory.apply(adminConfig);

Review Comment:
   Good catch, done.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -582,6 +588,59 @@ public boolean startTask(
 }
 }
 
+/**
+ * Using the admin principal for this connector, perform a round of zombie 
fencing that disables transactional producers
+ * for the specified number of source tasks from sending any more records.
+ * @param connName the name of the connector
+ * @param numTasks the number of tasks to fence out
+ * @param connProps the configuration of the connector; may not be null
+ * @return a {@link KafkaFuture} that will complete when the producers 
have all been fenced out, or the attempt has failed
+ */
+public KafkaFuture fenceZombies(String connName, int numTasks, 
Map connProps) {
+return fenceZombies(connName, numTasks, connProps, Admin::create);
+}
+
+// Allows us to mock out the Admin client for testing
+KafkaFuture fenceZombies(String connName, int numTasks, Map connProps, Function, Admin> adminFactory) {
+log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
+try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
+ClassLoader savedLoader = plugins.currentThreadLoader();
+try {
+String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.delegatingLoader().connectorLoader(connType);
+savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);

Review Comment:
   Ah yeah, been toying with that idea for a while but never got around to 
trying it out. Works pretty well in this case; the one wrinkle is that the 
signature for `AutoCloseable::close` includes a checked exception. I've added a 
new (internal) `LoaderSwap` class that implements `AutoCloseable` and removes 
that checked exception to address that.
   
   If this looks good, we can retrofit other parts of the code base to leverage 
it in a follow-up.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##
@@ -138,6 +138,17 @@ public interface Herder {
  */
 void putTaskConfigs(String connName, List> configs, 
Callback callback, InternalRequestSignature requestSignature);
 
+/**
+ * Fence out any older task generations for a source connector, and then 
write a record to the config topic
+ * indicating