[GitHub] [kafka] liuzhuang2017 commented on pull request #12272: MINOR: Update the README file in examples.

2022-10-11 Thread GitBox


liuzhuang2017 commented on PR #12272:
URL: https://github.com/apache/kafka/pull/12272#issuecomment-1274219825

   @dengziming OK, thanks your review, I have updated this pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] LiamClarkeNZ commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-10-11 Thread GitBox


LiamClarkeNZ commented on PR #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1274319609

   Hi Konstantine, please note that reverting KAFKA-7077 won't disable
   producer idempotence in Kafka Connect, as it never explicitly enabled or
   disabled it. You'll still need to make changes in code to explicitly
   disable producer idempotence to achieve your aims, whether 7077 is reverted
   or not.
   
   On Wed, 23 Mar 2022 at 11:15, Konstantine Karantasis <
   ***@***.***> wrote:
   
   > @Ismael  you're bringing up a good point,
   > which I missed in your latest comment before I send my recent reply.
   >
   > In light of the requirement to explicitly add the IDEMPOTENT_WRITE ACL for
   > Connect workers when talking to Kafka brokers older 2.8 (which I wasn't
   > aware of), I'd like to suggest changing course here as follows:
   >
   >- Revert KAFKA-7077 
   >from all the branches that has been merged.
   >- Return to KIP-318
   >
,
   >update it and actually vote for it. @LiamClarkeNZ
   > you referred to this KIP in the
   >description of this PR but I missed that this KIP hasn't been approved 
and
   >is actually currently marked as inactive. I think we should raise it 
again
   >after we update it to include all the compatibility requirements and 
have
   >it target the next major version (4.0).
   >- Issue a new PR that will explicitly disable idempotency by default
   >in Connect and will allow users to override the setting via the worker
   >and/or the connector configs like we allow it today.
   >- In the same PR, update our docs to say that despite the Kafka
   >producer enabling idempotency by default in 3.0, due to compatibility
   >requirements Connect chooses to disable idempotency for all the 
producers
   >that instantiates by default.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-11 Thread GitBox


tombentley commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r992004436


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java:
##
@@ -56,7 +58,10 @@ public class MirrorClientConfig extends AbstractConfig {
 private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator 
used in remote topic naming convention.";
 public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
 DefaultReplicationPolicy.SEPARATOR_DEFAULT;
-
+
+public static final String FORWARDING_ADMIN_CLASS = 
"forwarding.admin.class";
+public static final String FORWARDING_ADMIN_CLASS_DOC = "Class which 
extends ForwardingAdmin to define custom cluster resource management (topics, 
configs, etc).";

Review Comment:
   I wonder if we should mention the constructor signature here as well?



##
clients/src/test/java/org/apache/kafka/clients/admin/FakeForwardingAdmin.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+
+public class FakeForwardingAdmin extends ForwardingAdmin {

Review Comment:
   Does this class need to be in `clients`, it looks like it's only used in 
`connect/mirror` so maybe it would be better there?



##
clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java:
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * ForwardingAdmin is the default value of `forwarding.admin.class` in MM2.
+ * MM2 users who wish to use customized behaviour Admin; they can extend the 
ForwardingAdmin and override some behaviours
+ * without need to provide a whole implementation of Admin.
+ * The class must have a contractor that accept configuration (Map config) to configure
+ * {@link KafkaAdminClient} and any other needed resource management clients.

Review Comment:
   ```suggestion
* {@code ForwardingAdmin} is the default value of {@code 
forwarding.admin.class} in MM2.
* Users who wish to customize the MM2 behaviour for the creation of topics 
and access control lists can extend this 
 * class without needing to provide a whole implementation of {@code Admin}.
* The class must have a constructor with signature {@code (Map config)} for configuring
* a decorated {@link KafkaAdminClient} and any other clients needed for 
external resource management.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the s

[jira] [Created] (KAFKA-14287) Multi noded with kraft combined mode will fail shutdown

2022-10-11 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14287:
-

 Summary: Multi noded with kraft combined mode will fail shutdown
 Key: KAFKA-14287
 URL: https://issues.apache.org/jira/browse/KAFKA-14287
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.3.1
Reporter: Luke Chen
Assignee: Luke Chen


Multiple nodes with kraft combined mode (i.e. 
process.roles='broker,controller') can startup successfully. When shutdown in 
combined mode, we'll unfence broker first. When the remaining controller nodes 
are less than quorum size (i.e. N / 2 + 1), the unfence record will not get 
committed to metadata topic successfully. So the broker will keep waiting for 
the shutdown granting response and then timeout error:

 
{code:java}
2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
java.util.concurrent.TimeoutException: Timeout expired before graceful shutdown 
completed
    at 
org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
    at 
org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
    at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
    at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14287) Multi noded with kraft combined mode will fail shutdown

2022-10-11 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615658#comment-17615658
 ] 

Luke Chen commented on KAFKA-14287:
---

[~dengziming] [~jsancio] [~hachikuji] , any thoughts about this issue? Firstly, 
I'd like to confirm this is the correct use case for multi combined nodes. If 
so (I assume), we can brainstrom for a good solution. Othewise, we just 
document that we don't support it.

 

> Multi noded with kraft combined mode will fail shutdown
> ---
>
> Key: KAFKA-14287
> URL: https://issues.apache.org/jira/browse/KAFKA-14287
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Multiple nodes with kraft combined mode (i.e. 
> process.roles='broker,controller') can startup successfully. When shutdown in 
> combined mode, we'll unfence broker first. When the remaining controller 
> nodes are less than quorum size (i.e. N / 2 + 1), the unfence record will not 
> get committed to metadata topic successfully. So the broker will keep waiting 
> for the shutdown granting response and then timeout error:
>  
> {code:java}
> 2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
> RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
> java.util.concurrent.TimeoutException: Timeout expired before graceful 
> shutdown completed
>     at 
> org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
>     at 
> org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
>     at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
>     at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14287) Multi noded with kraft combined mode will fail shutdown

2022-10-11 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14287:
--
Description: 
Multiple nodes with kraft combined mode (i.e. 
process.roles='broker,controller') can startup successfully. When shutdown in 
combined mode, we'll unfence broker first. When the remaining controller nodes 
are less than quorum size (i.e. N / 2 + 1), the unfence record will not get 
committed to metadata topic successfully. So the broker will keep waiting for 
the shutdown granting response and then timeout error:

 
{code:java}
2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
java.util.concurrent.TimeoutException: Timeout expired before graceful shutdown 
completed
    at 
org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
    at 
org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
    at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
    at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
 {code}
 

 

to reproduce:
 # start up 2 kraft combines nodes, so we need 2 nodes get quorum
 # shutdown any one node, in this time, it will shutdown successfully because 
when broker shutdown, the 2 controllers are all alive, so broker can be granted 
for shutdown
 # shutdown 2nd node, this time, the shutdown will be pending, and then timeout

  was:
Multiple nodes with kraft combined mode (i.e. 
process.roles='broker,controller') can startup successfully. When shutdown in 
combined mode, we'll unfence broker first. When the remaining controller nodes 
are less than quorum size (i.e. N / 2 + 1), the unfence record will not get 
committed to metadata topic successfully. So the broker will keep waiting for 
the shutdown granting response and then timeout error:

 
{code:java}
2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
java.util.concurrent.TimeoutException: Timeout expired before graceful shutdown 
completed
    at 
org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
    at 
org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
    at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
    at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
 {code}


> Multi noded with kraft combined mode will fail shutdown
> ---
>
> Key: KAFKA-14287
> URL: https://issues.apache.org/jira/browse/KAFKA-14287
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Multiple nodes with kraft combined mode (i.e. 
> process.roles='broker,controller') can startup successfully. When shutdown in 
> combined mode, we'll unfence broker first. When the remaining controller 
> nodes are less than quorum size (i.e. N / 2 + 1), the unfence record will not 
> get committed to metadata topic successfully. So the broker will keep waiting 
> for the shutdown granting response and then timeout error:
>  
> {code:java}
> 2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
> RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
> java.util.concurrent.TimeoutException: Timeout expired before graceful 
> shutdown completed
>     at 
> org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
>     at 
> org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
>     at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
>     at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
>  {code}
>  
>  
> to reproduce:
>  # start up 2 kraft combines nodes, so we need 2 nodes get quorum
>  # shutdown any one node, in this time, it will shutdown successfully because 
> when broker shutdown, the 2 controllers are all alive, so broker can be 
> granted for shutdown
>  # shutdown 2nd node, this time, the shutdown will be pending, and then 
> timeout



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14287) Multi noded with kraft combined mode will fail shutdown

2022-10-11 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14287:
--
Description: 
Multiple nodes with kraft combined mode (i.e. 
process.roles='broker,controller') can startup successfully. When shutdown in 
combined mode, we'll unfence broker first. When the remaining controller nodes 
are less than quorum size (i.e. N / 2 + 1), the unfence record will not get 
committed to metadata topic successfully. So the broker will keep waiting for 
the shutdown granting response and then timeout error:

 
{code:java}
2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
java.util.concurrent.TimeoutException: Timeout expired before graceful shutdown 
completed
    at 
org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
    at 
org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
    at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
    at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
 {code}
 

 

to reproduce:
 # start up 2 kraft combines nodes, so we need 2 nodes get quorum
 # shutdown any one node, in this time, it will shutdown successfully because 
when broker shutdown, the 2 controllers are all alive, so broker can be granted 
for shutdown
 # shutdown 2nd node, this time, the shutdown will be pending because we only 
have 1 controller left, and in the end, timeout error.

  was:
Multiple nodes with kraft combined mode (i.e. 
process.roles='broker,controller') can startup successfully. When shutdown in 
combined mode, we'll unfence broker first. When the remaining controller nodes 
are less than quorum size (i.e. N / 2 + 1), the unfence record will not get 
committed to metadata topic successfully. So the broker will keep waiting for 
the shutdown granting response and then timeout error:

 
{code:java}
2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
java.util.concurrent.TimeoutException: Timeout expired before graceful shutdown 
completed
    at 
org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
    at 
org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
    at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
    at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
 {code}
 

 

to reproduce:
 # start up 2 kraft combines nodes, so we need 2 nodes get quorum
 # shutdown any one node, in this time, it will shutdown successfully because 
when broker shutdown, the 2 controllers are all alive, so broker can be granted 
for shutdown
 # shutdown 2nd node, this time, the shutdown will be pending, and then timeout


> Multi noded with kraft combined mode will fail shutdown
> ---
>
> Key: KAFKA-14287
> URL: https://issues.apache.org/jira/browse/KAFKA-14287
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Multiple nodes with kraft combined mode (i.e. 
> process.roles='broker,controller') can startup successfully. When shutdown in 
> combined mode, we'll unfence broker first. When the remaining controller 
> nodes are less than quorum size (i.e. N / 2 + 1), the unfence record will not 
> get committed to metadata topic successfully. So the broker will keep waiting 
> for the shutdown granting response and then timeout error:
>  
> {code:java}
> 2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
> RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
> java.util.concurrent.TimeoutException: Timeout expired before graceful 
> shutdown completed
>     at 
> org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
>     at 
> org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
>     at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
>     at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
>  {code}
>  
>  
> to reproduce:
>  # start up 2 kraft combines nodes, so we need 2 nodes get quorum
>  # shutdown any one node, in this time, it will shutdown successfully because 
> when broker shutdown, the 2 controllers are all alive, so broker can be 
> granted for shutdown
>  # shutdown 2

[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-11 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r992140426


##
clients/src/test/java/org/apache/kafka/clients/admin/FakeForwardingAdmin.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+
+public class FakeForwardingAdmin extends ForwardingAdmin {

Review Comment:
   hmm, `FakeForwardingAdmin` is here because the `ForwardingAdmin` is in 
`clients/admin` but I don't mind moving them to connect/mirror at this point. 
they are in `clients/admin` based on some suggestions on the KIP discussion on 
the mailing list to have forwarding admin pluggable in other clients in the 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-10-11 Thread GitBox


OmniaGM commented on code in PR #12577:
URL: https://github.com/apache/kafka/pull/12577#discussion_r992140426


##
clients/src/test/java/org/apache/kafka/clients/admin/FakeForwardingAdmin.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+
+public class FakeForwardingAdmin extends ForwardingAdmin {

Review Comment:
   `FakeForwardingAdmin` is here because the `ForwardingAdmin` is in 
`clients/admin` but I don't mind moving them to connect/mirror at this point. 
they are in `clients/admin` based on some suggestions on the KIP discussion on 
the mailing list to have forwarding admin pluggable in other clients in the 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12730: HOTFIX: Only update input partitions of standby tasks if they really changed

2022-10-11 Thread GitBox


cadonna opened a new pull request, #12730:
URL: https://github.com/apache/kafka/pull/12730

   Updating the input partitions of tasks also updates the mapping from source 
nodes to input topics in the processor topology within the task. The mapping is 
updated with the topics from the topology metadata. The topology metadata does 
not prefix intermediate internal topics with the application ID. Thus, if a 
standby task has input partitions from an intermediate internal topic the 
update of the mapping in the processor topology leads to an invalid topology 
exception during recycling of a standby task to an active task when the input 
queues are created. This is because the input topics in the processor topology 
and the input partitions of the task do not match because the former miss the 
application ID prefix.
   
   The added verification to only update input partitions of standby tasks if 
they really changed avoids the invalid topology exception if the standby task 
only has input partitions from intermediate internal topics since they should 
never change. If the standby task has input partitions from intermediate 
internal topics and external topics subscribed to via a regex pattern, the 
invalid topology exception might still be triggered.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12730: HOTFIX: Only update input partitions of standby tasks if they really changed

2022-10-11 Thread GitBox


cadonna commented on PR #12730:
URL: https://github.com/apache/kafka/pull/12730#issuecomment-1274480546

   Call for review: @vpapavas 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12730: HOTFIX: Only update input partitions of standby tasks if they really changed

2022-10-11 Thread GitBox


cadonna commented on code in PR #12730:
URL: https://github.com/apache/kafka/pull/12730#discussion_r992179987


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,6 +420,27 @@ private void handleTasksWithoutStateUpdater(final 
Map inputPartitions) {
+/*
+We should only update input partitions of a standby task if the input 
partitions really changed. Updating the
+input partitions of tasks also updates the mapping from source nodes 
to input topics in the processor topology
+within the task. The mapping is updated with the topics from the 
topology metadata. The topology metadata does
+not prefix intermediate internal topics with the application ID. Thus, 
if a standby task has input partitions
+from an intermediate internal topic the update of the mapping in the 
processor topology leads to an invalid
+topology exception during recycling of a standby task to an active 
task when the input queues are created. This
+is because the input topics in the processor topology and the input 
partitions of the task do not match because
+the former miss the application ID prefix.
+For standby task that have only input partitions from intermediate 
internal topics this check avoids the invalid
+topology exception. Unfortunately, a subtopology might have input 
partitions subscribed to with a regex
+additionally intermediate internal topics which might still lead to an 
invalid topology exception during recycling
+irrespectively of this check here. Thus, there is still a bug to fix 
here.

Review Comment:
   I opened the following ticket: 
https://issues.apache.org/jira/browse/KAFKA-14288 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru opened a new pull request, #12731: MINOR: Fix usage instruction of skipSigning build parameter

2022-10-11 Thread GitBox


lucasbru opened a new pull request, #12731:
URL: https://github.com/apache/kafka/pull/12731

   skipSigning parameter must be set to a boolean value to work.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14288) Processor topology in tasks is updated with internal intermediate topics without application ID prefix

2022-10-11 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-14288:
-

 Summary: Processor topology in tasks is updated with internal 
intermediate topics without application ID prefix
 Key: KAFKA-14288
 URL: https://issues.apache.org/jira/browse/KAFKA-14288
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Bruno Cadonna


Updating the input partitions of tasks during assignment handling also updates 
the mapping from source nodes to input topics in the processor topology within 
the task. The mapping is updated with the topics from the topology metadata. 
The topology metadata does not prefix internal intermediate topics with the 
application ID. Thus, if a standby task has input partitions from an internal 
intermediate topic the update of the mapping in the processor topology leads to 
an invalid topology exception during recycling of a standby task to an active 
task when the input queues are created. This is because the input topics in the 
processor topology and the input partitions of the task do not match because 
the former miss the application ID prefix.
The case were standby tasks have input partitions only from internal 
intermediate topics can be fixed by checking if the input partitions really 
changed before updating the input partitions (see PR 
https://github.com/apache/kafka/pull/12730). Unfortunately, a subtopology might 
have input partitions subscribed to with a regex additionally to internal 
intermediate topics which might still lead to an invalid topology exception 
during recycling irrespectively of the aforementioned verification.

This bug might also affect active tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #12409: KAFKA-14058: Migrate ExactlyOnceWorkerSourceTaskTest from EasyMock and PowerMock to Mockito

2022-10-11 Thread GitBox


C0urante commented on PR #12409:
URL: https://github.com/apache/kafka/pull/12409#issuecomment-1274751737

   Rebased on latest trunk, test failures appear unrelated.
   
   @showuon @mimaison would one of you be able to take a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14289) Use non zero status code in Kafka CLI when the command failed

2022-10-11 Thread Nicolas Guyomar (Jira)
Nicolas Guyomar created KAFKA-14289:
---

 Summary: Use non zero status code in Kafka CLI when the command 
failed
 Key: KAFKA-14289
 URL: https://issues.apache.org/jira/browse/KAFKA-14289
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Nicolas Guyomar


Hi team,

Using the  kafka-consumer-groups CLI as an example,  running the 
--reset-offsets option on an active consumer group results in an ERROR in the 
stdout 

_Error: Assignments can only be reset if the group 'console-consumer-12543' is 
inactive, but the current state is Stable._

but the status code is 0, while it would be nice, for any automation work, to 
use status code 1

Was that a design decision to consider the command execution to be a success 
(it technically is because the command ran without bug) and return 0, and not 
interpret the Kafka response to map it to the CLI output status code ? 

Thank you

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14266) MirrorSourceTask will stop mirroring when get corrupt record

2022-10-11 Thread Yu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615898#comment-17615898
 ] 

Yu Wang commented on KAFKA-14266:
-

[~ChrisEgerton] for this issue, as the KafkaConsumer always return before fetch 
data from broker, it can use *select-rate == 0* to alert it.

> MirrorSourceTask will stop mirroring when get corrupt record
> 
>
> Key: KAFKA-14266
> URL: https://issues.apache.org/jira/browse/KAFKA-14266
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.2.3
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Critical
>
> The mirror task will keeping throwing this error when got a corrupt record
> {code:java}
> [2022-09-28 22:27:07,125] WARN Failure during poll. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from TOPIC-261. If needed, please seek past the record to 
> continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Record batch for partition 
> TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored 
> crc = 4289549294, computed crc = 3792599753)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
>         ... 12 more {code}
>  
> In the poll function of {*}MirrorSourceTask{*}, when the task got 
> {*}KafkaException{*}, it only print a warn level log and return null.
> {code:java}
> @Override
> public List poll() {
> if (!consumerAccess.tryAcquire()) {
> return null;
> }
> if (stopping) {
> return null;
> }
> try {
> ConsumerRecords records = consumer.poll(pollTimeout);
> List sourceRecords = new ArrayList<>(records.count());
> ...
> if (sourceRecords.isEmpty()) {
> // WorkerSourceTasks expects non-zero batch size
> return null;
> } else {
> log.trace("Polled {} records from {}.", sourceRecords.size(), 
> records.partitions());
> return sourceRecords;
> }
> } catch (WakeupException e) {
> return null;
> } catch (KafkaException e) {
> log.warn("Failure during poll.", e);
> return null;
> } catch (Throwable e)  {
> log.error("Failure during poll.", e);
> // allow Connect to deal with the exception
> throw e;
> } finally {
> consumerAccess.release();
> }
> } {code}
> In the ne

[jira] [Comment Edited] (KAFKA-14266) MirrorSourceTask will stop mirroring when get corrupt record

2022-10-11 Thread Yu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615898#comment-17615898
 ] 

Yu Wang edited comment on KAFKA-14266 at 10/11/22 3:07 PM:
---

[~ChrisEgerton] for this issue, as the KafkaConsumer always return before fetch 
data from broker, it can use kafka consumer's *select-rate == 0* to alert it.


was (Author: lucentwong):
[~ChrisEgerton] for this issue, as the KafkaConsumer always return before fetch 
data from broker, it can use *select-rate == 0* to alert it.

> MirrorSourceTask will stop mirroring when get corrupt record
> 
>
> Key: KAFKA-14266
> URL: https://issues.apache.org/jira/browse/KAFKA-14266
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.2.3
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Critical
>
> The mirror task will keeping throwing this error when got a corrupt record
> {code:java}
> [2022-09-28 22:27:07,125] WARN Failure during poll. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from TOPIC-261. If needed, please seek past the record to 
> continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Record batch for partition 
> TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored 
> crc = 4289549294, computed crc = 3792599753)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
>         ... 12 more {code}
>  
> In the poll function of {*}MirrorSourceTask{*}, when the task got 
> {*}KafkaException{*}, it only print a warn level log and return null.
> {code:java}
> @Override
> public List poll() {
> if (!consumerAccess.tryAcquire()) {
> return null;
> }
> if (stopping) {
> return null;
> }
> try {
> ConsumerRecords records = consumer.poll(pollTimeout);
> List sourceRecords = new ArrayList<>(records.count());
> ...
> if (sourceRecords.isEmpty()) {
> // WorkerSourceTasks expects non-zero batch size
> return null;
> } else {
> log.trace("Polled {} records from {}.", sourceRecords.size(), 
> records.partitions());
> return sourceRecords;
> }
> } catch (WakeupException e) {
> return null;
> } catch (KafkaException e) {
> log.warn("Failure during poll.", e);
> return null

[jira] [Resolved] (KAFKA-14266) MirrorSourceTask will stop mirroring when get corrupt record

2022-10-11 Thread Yu Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Wang resolved KAFKA-14266.
-
Resolution: Works for Me

> MirrorSourceTask will stop mirroring when get corrupt record
> 
>
> Key: KAFKA-14266
> URL: https://issues.apache.org/jira/browse/KAFKA-14266
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.2.3
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Critical
>
> The mirror task will keeping throwing this error when got a corrupt record
> {code:java}
> [2022-09-28 22:27:07,125] WARN Failure during poll. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from TOPIC-261. If needed, please seek past the record to 
> continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Record batch for partition 
> TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored 
> crc = 4289549294, computed crc = 3792599753)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
>         ... 12 more {code}
>  
> In the poll function of {*}MirrorSourceTask{*}, when the task got 
> {*}KafkaException{*}, it only print a warn level log and return null.
> {code:java}
> @Override
> public List poll() {
> if (!consumerAccess.tryAcquire()) {
> return null;
> }
> if (stopping) {
> return null;
> }
> try {
> ConsumerRecords records = consumer.poll(pollTimeout);
> List sourceRecords = new ArrayList<>(records.count());
> ...
> if (sourceRecords.isEmpty()) {
> // WorkerSourceTasks expects non-zero batch size
> return null;
> } else {
> log.trace("Polled {} records from {}.", sourceRecords.size(), 
> records.partitions());
> return sourceRecords;
> }
> } catch (WakeupException e) {
> return null;
> } catch (KafkaException e) {
> log.warn("Failure during poll.", e);
> return null;
> } catch (Throwable e)  {
> log.error("Failure during poll.", e);
> // allow Connect to deal with the exception
> throw e;
> } finally {
> consumerAccess.release();
> }
> } {code}
> In the next poll round, the consumer will keep throwing exception because it 
> has received a corrupt record. Which makes the  *MirrorSourceTask* cannot get 
> next records and 

[jira] [Closed] (KAFKA-14266) MirrorSourceTask will stop mirroring when get corrupt record

2022-10-11 Thread Yu Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Wang closed KAFKA-14266.
---

> MirrorSourceTask will stop mirroring when get corrupt record
> 
>
> Key: KAFKA-14266
> URL: https://issues.apache.org/jira/browse/KAFKA-14266
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.2.3
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Critical
>
> The mirror task will keeping throwing this error when got a corrupt record
> {code:java}
> [2022-09-28 22:27:07,125] WARN Failure during poll. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from TOPIC-261. If needed, please seek past the record to 
> continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Record batch for partition 
> TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored 
> crc = 4289549294, computed crc = 3792599753)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
>         ... 12 more {code}
>  
> In the poll function of {*}MirrorSourceTask{*}, when the task got 
> {*}KafkaException{*}, it only print a warn level log and return null.
> {code:java}
> @Override
> public List poll() {
> if (!consumerAccess.tryAcquire()) {
> return null;
> }
> if (stopping) {
> return null;
> }
> try {
> ConsumerRecords records = consumer.poll(pollTimeout);
> List sourceRecords = new ArrayList<>(records.count());
> ...
> if (sourceRecords.isEmpty()) {
> // WorkerSourceTasks expects non-zero batch size
> return null;
> } else {
> log.trace("Polled {} records from {}.", sourceRecords.size(), 
> records.partitions());
> return sourceRecords;
> }
> } catch (WakeupException e) {
> return null;
> } catch (KafkaException e) {
> log.warn("Failure during poll.", e);
> return null;
> } catch (Throwable e)  {
> log.error("Failure during poll.", e);
> // allow Connect to deal with the exception
> throw e;
> } finally {
> consumerAccess.release();
> }
> } {code}
> In the next poll round, the consumer will keep throwing exception because it 
> has received a corrupt record. Which makes the  *MirrorSourceTask* cannot get 
> next records and be blocked on the same offset.
> 

[GitHub] [kafka] niket-goel commented on pull request #12709: KAFKA-14275: KRaft Controllers should crash after failing to apply any metadata record

2022-10-11 Thread GitBox


niket-goel commented on PR #12709:
URL: https://github.com/apache/kafka/pull/12709#issuecomment-1274898442

   Two different tests failing this time (timeout and metric not updated)
   ```
   org.apache.kafka.common.network.SslTransportLayerTest
   org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12730: HOTFIX: Only update input partitions of standby tasks if they really changed

2022-10-11 Thread GitBox


guozhangwang commented on PR #12730:
URL: https://github.com/apache/kafka/pull/12730#issuecomment-1274908187

   Thanks @cadonna , taking a look now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12730: HOTFIX: Only update input partitions of standby tasks if they really changed

2022-10-11 Thread GitBox


guozhangwang commented on code in PR #12730:
URL: https://github.com/apache/kafka/pull/12730#discussion_r992508509


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,6 +420,27 @@ private void handleTasksWithoutStateUpdater(final 
Map inputPartitions) {
+/*
+We should only update input partitions of a standby task if the input 
partitions really changed. Updating the
+input partitions of tasks also updates the mapping from source nodes 
to input topics in the processor topology
+within the task. The mapping is updated with the topics from the 
topology metadata. The topology metadata does
+not prefix intermediate internal topics with the application ID. Thus, 
if a standby task has input partitions
+from an intermediate internal topic the update of the mapping in the 
processor topology leads to an invalid
+topology exception during recycling of a standby task to an active 
task when the input queues are created. This
+is because the input topics in the processor topology and the input 
partitions of the task do not match because
+the former miss the application ID prefix.
+For standby task that have only input partitions from intermediate 
internal topics this check avoids the invalid
+topology exception. Unfortunately, a subtopology might have input 
partitions subscribed to with a regex
+additionally intermediate internal topics which might still lead to an 
invalid topology exception during recycling
+irrespectively of this check here. Thus, there is still a bug to fix 
here.

Review Comment:
   We may also change intermediate topic num.partitions soon so we need to 
revisit this bug asap, cc @ableegoldman who's looking into it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12730: HOTFIX: Only update input partitions of standby tasks if they really changed

2022-10-11 Thread GitBox


cadonna commented on PR #12730:
URL: https://github.com/apache/kafka/pull/12730#issuecomment-1274945280

   Failures are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=zk
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12730: HOTFIX: Only update input partitions of standby tasks if they really changed

2022-10-11 Thread GitBox


cadonna merged PR #12730:
URL: https://github.com/apache/kafka/pull/12730


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12726: MINOR; Some sections are missing in the TOC

2022-10-11 Thread GitBox


jsancio commented on code in PR #12726:
URL: https://github.com/apache/kafka/pull/12726#discussion_r992560080


##
docs/toc.html:
##
@@ -111,22 +117,28 @@
 Multi-Tenancy and Geo-Replication
 Further 
considerations
 
+
 6.5 Important Configs
 
 Important Client 
Configs
 A Production Server 
Configs
 
+
 6.6 Java Version
 6.7 Hardware and OS
 
-OS
-Disks and Filesystems
-Application vs OS Flush 
Management
-Linux Flush Behavior
-Ext4 Notes
+OS
+Disks and Filesystems
+Application vs OS Flush 
Management
+Linux Flush Behavior
+Filesystem 
Selection
+Replace KRaft Controller 
Disk
 
+
 6.8 Monitoring
 
+Security Considerations for 
Remote Monitoring using JMX
+KRaft Monitoring 
Metrics

Review Comment:
   Yes. Removed metrics from the title.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12726: MINOR; Some sections are missing in the TOC

2022-10-11 Thread GitBox


jsancio commented on PR #12726:
URL: https://github.com/apache/kafka/pull/12726#issuecomment-1274984481

   Thanks for the review @showuon . Merging, cherry-picking to 3.3 and 
submitting PR to kafka-site.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #12726: MINOR; Some sections are missing in the TOC

2022-10-11 Thread GitBox


jsancio merged PR #12726:
URL: https://github.com/apache/kafka/pull/12726


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12709: KAFKA-14275: KRaft Controllers should crash after failing to apply any metadata record

2022-10-11 Thread GitBox


hachikuji merged PR #12709:
URL: https://github.com/apache/kafka/pull/12709


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12355: KAFKA-14017: Implement new KIP-618 APIs in FileStreamSourceConnector

2022-10-11 Thread GitBox


C0urante commented on code in PR #12355:
URL: https://github.com/apache/kafka/pull/12355#discussion_r992564573


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -95,4 +96,18 @@ public void stop() {
 public ConfigDef config() {
 return CONFIG_DEF;
 }
+
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map props) {

Review Comment:
   I have plans to add a dedicated section for exactly-once support to our 
docs; I think it'd be better to put that information there since it's a 
fairly-advanced concept and probably wouldn't be too useful for people just 
getting up to speed with the Connector API. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector

2022-10-11 Thread GitBox


C0urante commented on PR #12366:
URL: https://github.com/apache/kafka/pull/12366#issuecomment-1274990097

   @tombentley If you have a moment, would you mind taking a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14275) KRaft Controllers should crash after failing to apply any metadata record

2022-10-11 Thread Niket Goel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niket Goel resolved KAFKA-14275.

Fix Version/s: 3.3
 Reviewer: Jason Gustafson
   Resolution: Fixed

> KRaft Controllers should crash after failing to apply any metadata record 
> --
>
> Key: KAFKA-14275
> URL: https://issues.apache.org/jira/browse/KAFKA-14275
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Niket Goel
>Assignee: Niket Goel
>Priority: Major
> Fix For: 3.3
>
>
> When replaying records on a standby controller, any error encountered will 
> halt further processing of that batch. Currently we log an error and allow 
> the controller to continue normal operation. In contrast a similar error on 
> the active controller causes it to halt and exit the jvm. This is 
> inconsistent behavior as nothing prevents a standby from eventually becoming 
> the active controller (even when it had skipped over a record batch). We 
> should halt the process in the case of a standby controller as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji merged pull request #12634: KAFKA-14225: Fix deadlock caused by lazy val exemptSensor

2022-10-11 Thread GitBox


hachikuji merged PR #12634:
URL: https://github.com/apache/kafka/pull/12634


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nicktelford commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

2022-10-11 Thread GitBox


nicktelford commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r991332250


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##
@@ -115,7 +115,7 @@ static void closeStateManager(final Logger log,
   final StateDirectory stateDirectory,
   final TaskType taskType) {
 // if EOS is enabled, wipe out the whole state store for unclean close 
since it is now invalid
-final boolean wipeStateStore = !closeClean && eosEnabled;
+final boolean wipeStateStore = !closeClean && eosEnabled && 
!stateMgr.transactional();

Review Comment:
   If there are transactional and non-transactional stores in the same Task, 
any corruption in that Task will cause _all_ state in that Task to be wiped, 
including the transactional stores, because there's no other way to recover the 
non-transactional stores.
   
   For this reason, it's likely very important that every store within the same 
sub-topology is configured with the same transactionality.
   
   Maybe we should include a warning in 
`ProcessorState#Manager#transactional()` (and 
`GlobalStateManager#transactional()`) when there's a mix of stores with 
different transactionality?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore> implements KeyValueStore {
+private static final byte MODIFICATION = 0x1;
+private static final byte DELETION = 0x2;
+private static final byte[] DELETION_VAL = {DELETION};
+
+private StateStoreContext context;
+
+static final String PREFIX = "transactional-";
+//VisibleForTesting
+public static final String TMP_SUFFIX = ".tmp";
+
+private final Set openIterators = 
Collections.synchronizedSet(new HashSet<>());
+
+Map configs;
+File stateDir;
+
+private boolean consistencyEnabled = false;
+private Position position;
+protected OffsetCheckpoint positionCheckpoint;
+
+KeyValueSegment createTmpStore(final String segmentName,
+   final String windowName,
+   final long segmentId,
+   final RocksDBMetricsRecorder 
metricsRecorder) {
+return new KeyValueSegment(segmentName + TMP_SUFFIX,
+windowName,
+segmentId,
+metricsRecorder);
+}
+
+public abstract T mainStore();
+
+public abstract KeyValueSegment tmpStore();
+
+@Deprecated
+@Override
+public void init(fi

[GitHub] [kafka] C0urante commented on pull request #12543: KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments

2022-10-11 Thread GitBox


C0urante commented on PR #12543:
URL: https://github.com/apache/kafka/pull/12543#issuecomment-1275066936

   Thanks @jolshan. I believe it's valid to perform preferred leader election 
for topics even if they're currently undergoing reassignment, for reasons 
[discussed in an earlier 
PR](https://github.com/apache/kafka/pull/9302#discussion_r491094781) for this 
ticket by @hachikuji:
   > During a reassignment, the adding replicas are always listed first which 
means the preferred leader is among the target replicas. My take is that we 
want to move the leadership onto the new preferred leader as soon as possible 
since the whole point of the reassignment is to take load off the removing 
replicas.
   
   I've double-checked the code base and this still appears to be the case; let 
me know if I'm missing something, though.
   
   Definitely agree RE commenting the test code; @songnon if you have time 
would you mind adding some details? Explanation of why we're shutting down and 
restarting which brokers when (e.g., to create a stuck reassignment, or to 
create a pending preferred leader election) should be plenty.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12667: KAFKA-10635: Improve logging with throwing OOOSE

2022-10-11 Thread GitBox


guozhangwang commented on PR #12667:
URL: https://github.com/apache/kafka/pull/12667#issuecomment-1275079800

   ping @nicktelford , have you been able to try out this patch and collect the 
improved logs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-10-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616014#comment-17616014
 ] 

Greg Harris commented on KAFKA-14102:
-

>From a cursory investigation, this appears to be a pitfall of using 
>dynamically-loaded security providers in a multi-classloader environment, and 
>does not seem specific to the kafka clients.
I also don't immediately see any way to replace the Sasl/Security classes with 
classloader-aware implementations, though I have a feeling it's possible.

There are a couple of different workarounds that you may consider trying on 
your end, if you haven't already found a workaround:
 * Disable co-location of apps which use Kafka inside the JVM
 * Move the Kafka Client libraries into a single shared ClassLoader for all of 
the apps
 * Repackage the authentication.* classes into a different jar and move those 
to a single shared classloader for all of the apps

I'll remove the KafkaConnect label, as this affects all users of clients, and 
not Connect in any way particularly.

> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first 
> started app can consume messages
> 
>
> Key: KAFKA-14102
> URL: https://issues.apache.org/jira/browse/KAFKA-14102
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Shuo Chen
>Priority: Blocker
>
> We have 2 web applications (A and B) will consume messages from the same 
> Kafka Server,  so they have the same configurations:
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required; 
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
> jaas.enabled=true{code}
>  
> A and B deployed together in one Tomcat server (means they are in JVM 
> process), startup  sequential is A -> B,  then we find B cannot consume the 
> message with following exception:
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6 
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
> o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to 
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
>  ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
>  ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.Ne

[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-10-11 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14102:

Component/s: (was: KafkaConnect)

> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first 
> started app can consume messages
> 
>
> Key: KAFKA-14102
> URL: https://issues.apache.org/jira/browse/KAFKA-14102
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: Shuo Chen
>Priority: Blocker
>
> We have 2 web applications (A and B) will consume messages from the same 
> Kafka Server,  so they have the same configurations:
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required; 
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
> jaas.enabled=true{code}
>  
> A and B deployed together in one Tomcat server (means they are in JVM 
> process), startup  sequential is A -> B,  then we find B cannot consume the 
> message with following exception:
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6 
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
> o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to 
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
>  ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
>  ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator

[GitHub] [kafka] nicktelford commented on pull request #12667: KAFKA-10635: Improve logging with throwing OOOSE

2022-10-11 Thread GitBox


nicktelford commented on PR #12667:
URL: https://github.com/apache/kafka/pull/12667#issuecomment-1275111955

   Hey @guozhangwang, we deployed this today, but we need to wait a couple of 
days for our app to recover before we can test a leadership election. I'm 
hoping I'll be able to do this towards the end of the week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14290) Fix bugs that could block KRaft controlled shutdown indefinitely

2022-10-11 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14290:


 Summary: Fix bugs that could block KRaft controlled shutdown 
indefinitely
 Key: KAFKA-14290
 URL: https://issues.apache.org/jira/browse/KAFKA-14290
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji opened a new pull request, #12732: MINOR: Fix incorrect example in feature command help

2022-10-11 Thread GitBox


hachikuji opened a new pull request, #12732:
URL: https://github.com/apache/kafka/pull/12732

   When using `kafka-features.sh` with the `--feature` parameter, we expect a 
numeric feature level (e.g. `metadata.version=5`). The help example suggests 
that we can also use a descriptive version string for `metadata.version`, which 
doesn't work.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode

2022-10-11 Thread Akhilesh Chaganti (Jira)
Akhilesh Chaganti created KAFKA-14291:
-

 Summary: KRaft: ApiVersionsResponse doesn't have finalizedFeatures 
and finalizedFeatureEpoch in KRaft mode
 Key: KAFKA-14291
 URL: https://issues.apache.org/jira/browse/KAFKA-14291
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: Akhilesh Chaganti
 Fix For: 3.4.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14292) KRaft broker controlled shutdown can be delayed indefinitely

2022-10-11 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14292:
---

 Summary: KRaft broker controlled shutdown can be delayed 
indefinitely
 Key: KAFKA-14292
 URL: https://issues.apache.org/jira/browse/KAFKA-14292
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Alyssa Huang


We noticed when rolling a kraft cluster that it took an unexpectedly long time 
for one of the brokers to shutdown. In the logs, we saw the following:
{code:java}
Oct 11, 2022 @ 17:53:38.277 [Controller 1] The request from broker 8 to 
shut down can not yet be granted because the lowest active offset 2283357 is 
not greater than the broker's shutdown offset 2283358. 
org.apache.kafka.controller.BrokerHeartbeatManager  DEBUG   
2Oct 11, 2022 @ 17:53:38.277[Controller 1] Updated the controlled shutdown 
offset for broker 8 to 2283362.  
org.apache.kafka.controller.BrokerHeartbeatManager  DEBUG   
3Oct 11, 2022 @ 17:53:40.278[Controller 1] Updated the controlled shutdown 
offset for broker 8 to 2283366.  
org.apache.kafka.controller.BrokerHeartbeatManager  DEBUG   
4Oct 11, 2022 @ 17:53:40.278[Controller 1] The request from broker 8 to 
shut down can not yet be granted because the lowest active offset 2283361 is 
not greater than the broker's shutdown offset 2283362. 
org.apache.kafka.controller.BrokerHeartbeatManager  DEBUG   
5Oct 11, 2022 @ 17:53:42.279[Controller 1] The request from broker 8 to 
shut down can not yet be granted because the lowest active offset 2283365 is 
not greater than the broker's shutdown offset 2283366. 
org.apache.kafka.controller.BrokerHeartbeatManager  DEBUG   
6Oct 11, 2022 @ 17:53:42.279[Controller 1] Updated the controlled shutdown 
offset for broker 8 to 2283370.  
org.apache.kafka.controller.BrokerHeartbeatManager  DEBUG   
7Oct 11, 2022 @ 17:53:44.280[Controller 1] The request from broker 8 to 
shut down can not yet be granted because the lowest active offset 2283369 is 
not greater than the broker's shutdown offset 2283370. 
org.apache.kafka.controller.BrokerHeartbeatManager  DEBUG   
8Oct 11, 2022 @ 17:53:44.281[Controller 1] Updated the controlled shutdown 
offset for broker 8 to 2283374.  
org.apache.kafka.controller.BrokerHeartbeatManager  DEBUG{code}
>From what I can tell, it looks like the controller waits until all brokers 
>have caught up to the {{controlledShutdownOffset}} of the broker that is 
>shutting down before allowing it to proceed. Probably the intent is to make 
>sure they have all the leader and ISR state.

The problem is that the {{controlledShutdownOffset}} seems to be updated after 
every heartbeat that the controller receives: 
https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L1996.
 Unless all other brokers can catch up to that offset before the next heartbeat 
from the shutting down broker is received, then the broker remains in the 
shutting down state indefinitely.

In this case, it took more than 40 minutes before the broker completed shutdown:
{code:java}
1Oct 11, 2022 @ 18:36:36.105[Controller 1] The request from broker 8 to 
shut down has been granted since the lowest active offset 2288510 is now 
greater than the broker's controlled shutdown offset 2288510.  
org.apache.kafka.controller.BrokerHeartbeatManager  INFO
2Oct 11, 2022 @ 18:40:35.197[Controller 1] The request from broker 8 to 
unfence has been granted because it has caught up with the offset of it's 
register broker record 2288906.   
org.apache.kafka.controller.BrokerHeartbeatManager  INFO{code}
It seems like the bug here is that we should not keep updating 
{{controlledShutdownOffset}} if it has already been set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode

2022-10-11 Thread Akhilesh Chaganti (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akhilesh Chaganti updated KAFKA-14291:
--
Description: 
https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53
```
class SimpleApiVersionManager(
  val listenerType: ListenerType,
  val enabledApis: collection.Set[ApiKeys],
  brokerFeatures: Features[SupportedVersionRange]
) extends ApiVersionManager {

  def this(listenerType: ListenerType) = {
this(listenerType, ApiKeys.apisForListener(listenerType).asScala, 
BrokerFeatures.defaultSupportedFeatures())
  }

  private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)

  override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse 
= {
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
apiVersions, brokerFeatures)
  }
}

```

ApiVersionManager for KRaft doesn't add the finalizedFeatures and 
finalizedFeatureEpoch to the ApiVersionsResponse.

> KRaft: ApiVersionsResponse doesn't have finalizedFeatures and 
> finalizedFeatureEpoch in KRaft mode
> -
>
> Key: KAFKA-14291
> URL: https://issues.apache.org/jira/browse/KAFKA-14291
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Akhilesh Chaganti
>Priority: Critical
> Fix For: 3.4.0
>
>
> https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53
> ```
> class SimpleApiVersionManager(
>   val listenerType: ListenerType,
>   val enabledApis: collection.Set[ApiKeys],
>   brokerFeatures: Features[SupportedVersionRange]
> ) extends ApiVersionManager {
>   def this(listenerType: ListenerType) = {
> this(listenerType, ApiKeys.apisForListener(listenerType).asScala, 
> BrokerFeatures.defaultSupportedFeatures())
>   }
>   private val apiVersions = 
> ApiVersionsResponse.collectApis(enabledApis.asJava)
>   override def apiVersionResponse(requestThrottleMs: Int): 
> ApiVersionsResponse = {
> ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
> apiVersions, brokerFeatures)
>   }
> }
> ```
> ApiVersionManager for KRaft doesn't add the finalizedFeatures and 
> finalizedFeatureEpoch to the ApiVersionsResponse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-10-11 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Attachment: image-2022-10-12-08-39-01-004.png

> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first 
> started app can consume messages
> 
>
> Key: KAFKA-14102
> URL: https://issues.apache.org/jira/browse/KAFKA-14102
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: Shuo Chen
>Priority: Blocker
> Attachments: image-2022-10-12-08-39-01-004.png
>
>
> We have 2 web applications (A and B) will consume messages from the same 
> Kafka Server,  so they have the same configurations:
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required; 
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
> jaas.enabled=true{code}
>  
> A and B deployed together in one Tomcat server (means they are in JVM 
> process), startup  sequential is A -> B,  then we find B cannot consume the 
> message with following exception:
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6 
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
> o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to 
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
>  ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
>  ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internal

[jira] [Updated] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-10-11 Thread Shuo Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
--
Attachment: image-2022-10-12-08-43-57-597.png

> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first 
> started app can consume messages
> 
>
> Key: KAFKA-14102
> URL: https://issues.apache.org/jira/browse/KAFKA-14102
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: Shuo Chen
>Priority: Blocker
> Attachments: image-2022-10-12-08-39-01-004.png, 
> image-2022-10-12-08-43-57-597.png
>
>
> We have 2 web applications (A and B) will consume messages from the same 
> Kafka Server,  so they have the same configurations:
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required; 
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
> jaas.enabled=true{code}
>  
> A and B deployed together in one Tomcat server (means they are in JVM 
> process), startup  sequential is A -> B,  then we find B cannot consume the 
> message with following exception:
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6 
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
> o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to 
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
>  ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
>  ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.

[jira] [Commented] (KAFKA-14102) (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

2022-10-11 Thread Shuo Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616105#comment-17616105
 ] 

Shuo Chen commented on KAFKA-14102:
---

[~gharris1727] You are right, my workaround is to move kafka-client 
library(along with the libraries it depends on) out of application A and B and 
put the libraries in tomcat/lib/.  So the classes will be loaded by the shared 
parent classloader then the issue was solved.

> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first 
> started app can consume messages
> 
>
> Key: KAFKA-14102
> URL: https://issues.apache.org/jira/browse/KAFKA-14102
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: Shuo Chen
>Priority: Blocker
> Attachments: image-2022-10-12-08-39-01-004.png, 
> image-2022-10-12-08-43-57-597.png
>
>
> We have 2 web applications (A and B) will consume messages from the same 
> Kafka Server,  so they have the same configurations:
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required; 
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
> jaas.enabled=true{code}
>  
> A and B deployed together in one Tomcat server (means they are in JVM 
> process), startup  sequential is A -> B,  then we find B cannot consume the 
> message with following exception:
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6 
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
> o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to 
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
>  ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
>  ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:206)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:143) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clien

[GitHub] [kafka] niket-goel opened a new pull request, #12733: KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)

2022-10-11 Thread GitBox


niket-goel opened a new pull request, #12733:
URL: https://github.com/apache/kafka/pull/12733

   Make all faults in metadata processing on standby controllers be fatal. This 
is the same behavior-wise as the active controller. This prevents a standby 
controller from eventually becoming active with incomplete state.
   
   Reviewers: Colin Patrick McCabe , Jason Gustafson 

   (cherry picked from commit 98a3dcb477914dcf0c1adc4f7494db9e9a55bd3e)
   
   Conflicts in the following files around availability of Builders for some 
test classes:
   
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
   
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14287) Multi noded with kraft combined mode will fail shutdown

2022-10-11 Thread Deng Ziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616128#comment-17616128
 ] 

Deng Ziming commented on KAFKA-14287:
-

This is a problem, currently the raft dissertation assume there are always more 
than half alive nodes, whereas in this case this can't be ensured, so we should 
have our own logic about it.

> Multi noded with kraft combined mode will fail shutdown
> ---
>
> Key: KAFKA-14287
> URL: https://issues.apache.org/jira/browse/KAFKA-14287
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Multiple nodes with kraft combined mode (i.e. 
> process.roles='broker,controller') can startup successfully. When shutdown in 
> combined mode, we'll unfence broker first. When the remaining controller 
> nodes are less than quorum size (i.e. N / 2 + 1), the unfence record will not 
> get committed to metadata topic successfully. So the broker will keep waiting 
> for the shutdown granting response and then timeout error:
>  
> {code:java}
> 2022-10-11 18:01:14,341] ERROR [kafka-raft-io-thread]: Graceful shutdown of 
> RaftClient failed (kafka.raft.KafkaRaftManager$RaftIoThread)
> java.util.concurrent.TimeoutException: Timeout expired before graceful 
> shutdown completed
>     at 
> org.apache.kafka.raft.KafkaRaftClient$GracefulShutdown.failWithTimeout(KafkaRaftClient.java:2408)
>     at 
> org.apache.kafka.raft.KafkaRaftClient.maybeCompleteShutdown(KafkaRaftClient.java:2163)
>     at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2230)
>     at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
>  {code}
>  
>  
> to reproduce:
>  # start up 2 kraft combines nodes, so we need 2 nodes get quorum
>  # shutdown any one node, in this time, it will shutdown successfully because 
> when broker shutdown, the 2 controllers are all alive, so broker can be 
> granted for shutdown
>  # shutdown 2nd node, this time, the shutdown will be pending because we only 
> have 1 controller left, and in the end, timeout error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode

2022-10-11 Thread Deng Ziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616137#comment-17616137
 ] 

Deng Ziming commented on KAFKA-14291:
-

Thank you [~akhileshchg] for reporting this.

This problem is similar to KAFKA-13990, I already found this problem, I also 
found it only occur in the kraft controllers and the brokers are not infected 
by this, so this problem doesn't have any impact currently since the 
finalizedFeatures is not used in kraft controller. 

> KRaft: ApiVersionsResponse doesn't have finalizedFeatures and 
> finalizedFeatureEpoch in KRaft mode
> -
>
> Key: KAFKA-14291
> URL: https://issues.apache.org/jira/browse/KAFKA-14291
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Akhilesh Chaganti
>Priority: Critical
> Fix For: 3.4.0
>
>
> https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53
> ```
> class SimpleApiVersionManager(
>   val listenerType: ListenerType,
>   val enabledApis: collection.Set[ApiKeys],
>   brokerFeatures: Features[SupportedVersionRange]
> ) extends ApiVersionManager {
>   def this(listenerType: ListenerType) = {
> this(listenerType, ApiKeys.apisForListener(listenerType).asScala, 
> BrokerFeatures.defaultSupportedFeatures())
>   }
>   private val apiVersions = 
> ApiVersionsResponse.collectApis(enabledApis.asJava)
>   override def apiVersionResponse(requestThrottleMs: Int): 
> ApiVersionsResponse = {
> ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
> apiVersions, brokerFeatures)
>   }
> }
> ```
> ApiVersionManager for KRaft doesn't add the finalizedFeatures and 
> finalizedFeatureEpoch to the ApiVersionsResponse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming commented on a diff in pull request #12272: MINOR: Update the README file in examples.

2022-10-11 Thread GitBox


dengziming commented on code in PR #12272:
URL: https://github.com/apache/kafka/pull/12272#discussion_r992924582


##
examples/README:
##
@@ -2,11 +2,11 @@ This directory contains examples of client code that uses 
kafka.
 
 To run the demo:
 
-   1. Start Zookeeper and the Kafka server
+   1. Start Zookeeper and the Kafka server.Note, it's unnecessary to start 
Zookeeper in KRaft mode.

Review Comment:
   I think this can still be improved like this:
   `In Zk mode, Start Zookeeper and the Kafka server; in KRaft mode, start the 
Kafka server.`
   Also note that we should have a space between a punctuation and a word.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] liuzhuang2017 commented on pull request #12272: MINOR: Update the README file in examples.

2022-10-11 Thread GitBox


liuzhuang2017 commented on PR #12272:
URL: https://github.com/apache/kafka/pull/12272#issuecomment-1275512545

   @dengziming ,Thanks your review, I have updated this pr according to your 
suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12272: MINOR: Update the README file in examples.

2022-10-11 Thread GitBox


dengziming commented on code in PR #12272:
URL: https://github.com/apache/kafka/pull/12272#discussion_r992983902


##
examples/README:
##
@@ -2,11 +2,11 @@ This directory contains examples of client code that uses 
kafka.
 
 To run the demo:
 
-   1. Start Zookeeper and the Kafka server
+   1. In Zookeeper mode, Start Zookeeper and the Kafka server. In KRaft mode, 
start the Kafka server. Note, it's unnecessary to start Zookeeper in KRaft mode.

Review Comment:
   Thanks for the update @liuzhuang2017 , It's a little weird to mention that 
"it's unnecessary to start Zookeeper in KRaft mode" here since we already wrote 
how to operate in Zk mode. I think it's better to remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] liuzhuang2017 commented on pull request #12272: MINOR: Update the README file in examples.

2022-10-11 Thread GitBox


liuzhuang2017 commented on PR #12272:
URL: https://github.com/apache/kafka/pull/12272#issuecomment-1275577470

   @dengziming , Thanks your review,I have updated the pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12355: KAFKA-14017: Implement new KIP-618 APIs in FileStreamSourceConnector

2022-10-11 Thread GitBox


yashmayya commented on code in PR #12355:
URL: https://github.com/apache/kafka/pull/12355#discussion_r992989023


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -95,4 +96,18 @@ public void stop() {
 public ConfigDef config() {
 return CONFIG_DEF;
 }
+
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map props) {

Review Comment:
   Thanks Chris, I think that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org