[GitHub] [kafka] chernyih commented on pull request #12131: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases

2022-05-10 Thread GitBox


chernyih commented on PR #12131:
URL: https://github.com/apache/kafka/pull/12131#issuecomment-1123211008

   Retry backoff doesn't help. `KafkaAdminClient` fetches metadata from 
bootstrap server, let's say with a call timeout of 5s. Within 5s, 
`NetworkClient` can connect to the bootstrap server multiple times based on 
reconnect backoff. After 5s, fetch metadata call times out because the 
bootstrap server node is not ready. Since the last attempt of fetch metadata 
request is 5 seconds ago (which should be more than retry backoff), 
`KafkaAdminClient` will retry fetching metadata again.
   
   Another case is if there are clients that use `NetworkClient` and don't 
implement retry backoff, then retry backoff has no effect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12097: MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level

2022-05-10 Thread GitBox


ijuma commented on code in PR #12097:
URL: https://github.com/apache/kafka/pull/12097#discussion_r869841007


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+class TxnPartitionMap {
+
+final Map topicPartitions = new 
HashMap<>();
+
+TxnPartitionEntry get(TopicPartition topicPartition) {

Review Comment:
   I removed the `Partition` suffix from this and `getOrCreate`.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionBookkeeper.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+class TxnPartitionBookkeeper {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12144: MINOR: reload4j build dependency fixes

2022-05-10 Thread GitBox


ijuma commented on PR #12144:
URL: https://github.com/apache/kafka/pull/12144#issuecomment-1123142383

   Merged to trunk and cherry-picked to 3.2 and 3.1 branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12146: MINOR: Remove kraft authorizer from list of missing features

2022-05-10 Thread GitBox


ijuma commented on code in PR #12146:
URL: https://github.com/apache/kafka/pull/12146#discussion_r869838507


##
config/kraft/README.md:
##
@@ -114,9 +114,7 @@ We don't support any kind of upgrade right now, either to 
or from KRaft mode.  T
 
 Finally, the following Kafka features have not yet been fully implemented:
 
-* Support for certain security features: configuring a KRaft-based Authorizer, 
setting up SCRAM, delegation tokens, and so forth
-  (although note that you can use authorizers such as 
`kafka.security.authorizer.AclAuthorizer` with KRaft clusters, even
-  if they are ZooKeeper-based: simply define `authorizer.class.name` and 
configure the authorizer as you normally would).
+* Support for certain security features: setting up SCRAM, delegation tokens, 
and so forth.
 * Support for some configurations, like enabling unclean leader election by 
default or dynamically changing broker endpoints

Review Comment:
   @jsancio did you fix the unclean leader election thing with your KIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions

2022-05-10 Thread GitBox


showuon commented on PR #12112:
URL: https://github.com/apache/kafka/pull/12112#issuecomment-1123139779

   @dengziming , thanks for the update!
   @divijvaidya , if you have no other comments, I'm going to merge this PR 
within this week. 
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12041: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed

2022-05-10 Thread GitBox


C0urante commented on PR #12041:
URL: https://github.com/apache/kafka/pull/12041#issuecomment-1123137976

   @guozhangwang to be clear, nobody is advocating that we call `ignore` on 
everything. I was proposing that we call `ignore` on everything that's already 
defined, which is pretty clear if you read the code example I gave. Does that 
clear things up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12062: KAFKA-13833: Remove the min_version_level from the finalized version written to ZooKeeper

2022-05-10 Thread GitBox


dengziming commented on code in PR #12062:
URL: https://github.com/apache/kafka/pull/12062#discussion_r869835226


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -86,15 +86,15 @@ class DefaultApiVersionManager(
 throttleTimeMs,
 interBrokerProtocolVersion.highestSupportedRecordVersion,
 supportedFeatures,
-finalizedFeatures.features,
+finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,

Review Comment:
   Yes, we should convert it manually since scala won't do it automatically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #12144: MINOR: reload4j build dependency fixes

2022-05-10 Thread GitBox


ijuma merged PR #12144:
URL: https://github.com/apache/kafka/pull/12144


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-10 Thread GitBox


dengziming commented on PR #12104:
URL: https://github.com/apache/kafka/pull/12104#issuecomment-1123116129

   Hello @vamossagar12, sorry for changing around, we added a 
`TestUtils.ensureConsistentKRaftMetadata` in #12108 to fix this kind of 
flakiness, which waits until the brokers have caught up to the controller 
metadata topic end offset, instead of waiting for something really specific, 
you can just use this method for KRaft mode, and keep it was for zk mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions

2022-05-10 Thread GitBox


dengziming commented on PR #12112:
URL: https://github.com/apache/kafka/pull/12112#issuecomment-1123114260

   Hello @showuon @divijvaidya, #12108 has been merged, we added 
`TestUtils.ensureConsistentKRaftMetadata` to fix all this type of flakiness, 
PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12097: MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level

2022-05-10 Thread GitBox


ijuma commented on code in PR #12097:
URL: https://github.com/apache/kafka/pull/12097#discussion_r869795034


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionBookkeeper.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+class TxnPartitionBookkeeper {

Review Comment:
   Sounds good, will rename.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12097: MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level

2022-05-10 Thread GitBox


hachikuji commented on code in PR #12097:
URL: https://github.com/apache/kafka/pull/12097#discussion_r869776847


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionBookkeeper.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+class TxnPartitionBookkeeper {

Review Comment:
   I think basically we're providing a high level view into a map. How about 
`TxnPartitionMap`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading

2022-05-10 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r869766209


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   sensor
 }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = 
{
+if (channel != null) {
+  debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+  dec(listenerName, channel.socket.getInetAddress)
+  closeSocket(channel, this)

Review Comment:
   Update: I have changed it so that `closeChannel` takes a `Logging` argument 
(which will be either the Acceptor or Processor). This should lead to the log 
message about closure being listened under either the Acceptor or Processor, as 
it was before this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-7527) Enable Dependency Injection for Kafka Streams handlers (KIP-378)

2022-05-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7527.

Resolution: Fixed

> Enable Dependency Injection for Kafka Streams handlers (KIP-378)
> 
>
> Key: KAFKA-7527
> URL: https://issues.apache.org/jira/browse/KAFKA-7527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wladimir Schmidt
>Priority: Minor
>  Labels: kip, usability
>
> Implement solution proposed in the KIP-378 (Enable Dependency Injection for 
> Kafka Streams handlers).
> Link to 
> [KIP-378|https://cwiki.apache.org/confluence/display/KAFKA/KIP-378:+Enable+Dependency+Injection+for+Kafka+Streams+handlers]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mjsax commented on a diff in pull request #12122: WIP: Upgrade tests for KAFKA-13769

2022-05-10 Thread GitBox


mjsax commented on code in PR #12122:
URL: https://github.com/apache/kafka/pull/12122#discussion_r869751965


##
streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -63,21 +80,30 @@ public static void main(final String[] args) throws 
Exception {
 }));
 }
 
-private static  ProcessorSupplier printProcessorSupplier() {
+private static void buildFKTable(final KStream 
primaryTable,
+final KTable otherTable) {

Review Comment:
   nit: indention



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -37,6 +37,7 @@
 
 metadata_1_versions = [str(LATEST_0_10_0)]
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+fk_join_versions = [str(V_2_4_0)]

Review Comment:
   We only test with latest bug-fix releases, so we can use `LATEST_2_4` here.



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -37,6 +37,7 @@
 
 metadata_1_versions = [str(LATEST_0_10_0)]
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+fk_join_versions = [str(V_2_4_0)]

Review Comment:
   Should we test all versions 2.4 - 3.1 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12131: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases

2022-05-10 Thread GitBox


ijuma commented on PR #12131:
URL: https://github.com/apache/kafka/pull/12131#issuecomment-1122986117

   Thanks for the PR. One question: did retry backoff help in this case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12135: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-10 Thread GitBox


mjsax commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r869501138


##
streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java:
##
@@ -19,15 +19,52 @@
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
 import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * This interface controls the strategy that can be used to control how we 
emit results in a processor.
  */
 public interface EmitStrategy {
 
+Logger log = LoggerFactory.getLogger(EmitStrategy.class);
+
 enum StrategyType {
-ON_WINDOW_CLOSE,
-ON_WINDOW_UPDATE
+ON_WINDOW_UPDATE(0, new WindowUpdateStrategy()),
+ON_WINDOW_CLOSE(1, new WindowCloseStrategy());
+
+private final short code;
+private final EmitStrategy strategy;
+
+private short code() {
+return this.code;
+}
+
+private EmitStrategy strategy() {
+return this.strategy;
+}
+
+StrategyType(final int code, final EmitStrategy strategy) {
+this.code = (short) code;
+this.strategy = strategy;
+}
+
+private final static Map TYPE_TO_STRATEGY = new 
HashMap<>();
+
+static {
+for (final StrategyType type : StrategyType.values()) {
+if (TYPE_TO_STRATEGY.put(type.code(), type.strategy()) != null)
+throw new IllegalStateException("Code " + type.code() + " 
for type " +

Review Comment:
   Never seen anything like this before -- is it best practice to have a guard 
like this?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public abstract class AbstractKStreamTimeWindowAggregateProcessor extends ContextualProcessor, Change> {
+
+private final Time time = Time.SYSTEM;
+private final String storeName;
+private final EmitStrategy emitStrategy;
+private final boolean sendOldValues;
+protected final TimeTracker timeTracker = new TimeTracker();
+
+private TimestampedTupleForwarder, VAgg> tupleForwarder;
+protected TimestampedWindowStore windowStore;
+protected Sensor 

[jira] [Assigned] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-05-10 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakub Miroś reassigned KAFKA-13817:
---

Assignee: (was: Jakub Miroś)

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>  Labels: beginner, newbie
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13892) Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager

2022-05-10 Thread Andrew Grant (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534600#comment-17534600
 ] 

Andrew Grant commented on KAFKA-13892:
--

Merged in 
https://github.com/apache/kafka/commit/040b11d70594e0499e96014e17a307366b640444

> Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager
> 
>
> Key: KAFKA-13892
> URL: https://issues.apache.org/jira/browse/KAFKA-13892
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143]
>  we loop through the ACL filters and and add RemoveAccessControlEntryRecord 
> records to the response list for each matching ACL. I think there's a bug 
> where if two filters match the same ACL, we create two 
> RemoveAccessControlEntryRecord records for that same ACL. This is an issue 
> because upon replay we throw an exception 
> (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195)
>  if the ACL is not in the in-memory data structures which will happen to the 
> second RemoveAccessControlEntryRecord.
> Maybe we can just de-dupe both List and 
> List? I think something like (just showing code for 
> ApiMessageAndVersion):
> {code:java}
> private List 
> deDupeApiMessageAndVersion(List messages) {
> return new HashSet<>(messages).stream().collect(Collectors.toList());
> }{code}
> should suffice as I don't think the ordering matters within the list of 
> response objects.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13892) Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager

2022-05-10 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant resolved KAFKA-13892.
--
Resolution: Fixed

> Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager
> 
>
> Key: KAFKA-13892
> URL: https://issues.apache.org/jira/browse/KAFKA-13892
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143]
>  we loop through the ACL filters and and add RemoveAccessControlEntryRecord 
> records to the response list for each matching ACL. I think there's a bug 
> where if two filters match the same ACL, we create two 
> RemoveAccessControlEntryRecord records for that same ACL. This is an issue 
> because upon replay we throw an exception 
> (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195)
>  if the ACL is not in the in-memory data structures which will happen to the 
> second RemoveAccessControlEntryRecord.
> Maybe we can just de-dupe both List and 
> List? I think something like (just showing code for 
> ApiMessageAndVersion):
> {code:java}
> private List 
> deDupeApiMessageAndVersion(List messages) {
> return new HashSet<>(messages).stream().collect(Collectors.toList());
> }{code}
> should suffice as I don't think the ordering matters within the list of 
> response objects.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-05-10 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakub Miroś reassigned KAFKA-13817:
---

Assignee: Jakub Miroś

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Jakub Miroś
>Priority: Minor
>  Labels: beginner, newbie
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13879) Exponential backoff for reconnect does not work

2022-05-10 Thread Chern Yih Cheah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chern Yih Cheah resolved KAFKA-13879.
-
Resolution: Fixed

> Exponential backoff for reconnect does not work
> ---
>
> Key: KAFKA-13879
> URL: https://issues.apache.org/jira/browse/KAFKA-13879
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.7.0
>Reporter: Chern Yih Cheah
>Assignee: Chern Yih Cheah
>Priority: Minor
>
> When a client connects to a SSL listener using PLAINTEXT security protocol, 
> after the TCP connection is setup, the client considers the channel setup is 
> complete (in reality the channel setup is not complete yet). The client 
> issues API version request after that. When issuing API version request, 
> reconnection exponential backoff is reset. Since the broker expects SSL 
> handshake, client's API version request will cause the connection to 
> disconnect. Reconnect will happen without exponential backoff since it has 
> been reset.
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L249.]
>   



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cmccabe merged pull request #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager

2022-05-10 Thread GitBox


cmccabe merged PR #12145:
URL: https://github.com/apache/kafka/pull/12145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-10 Thread GitBox


ahuang98 commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r869715273


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -70,13 +47,17 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
+  val metadataVersion = getMetadataVersion(namespace)
+  if (metadataVersion.isLessThan(MetadataVersion.IBP_3_0_IV0)) {
+throw new TerseFailure(s"Cannot specify a metadata version less 
than ${MetadataVersion.IBP_3_0_IV0.featureLevel()}.")

Review Comment:
   nit: Create a `MetadataVersion.isKRaftSupported()` / 
`MetadataVersion.firstKRaftMetadataVersion()` method? You could use it in 
   
https://github.com/apache/kafka/pull/12050/files#diff-2ee48010b1035f21f5ebdd47a78fc632ed239179aee77e66d75a6bdb77ee8fd7R147


   and
   
https://github.com/apache/kafka/pull/12050/files#diff-602affccc2af320d6cdfda31afc197bf1a798497009c404706a600ab57c1bea5R78
   to name a few places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma opened a new pull request, #12146: MINOR: Remove kraft authorizer from list of missing features

2022-05-10 Thread GitBox


ijuma opened a new pull request, #12146:
URL: https://github.com/apache/kafka/pull/12146

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-10 Thread GitBox


ahuang98 commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r869544110


##
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java:
##
@@ -91,6 +97,23 @@ public static NodeApiVersions create(short apiKey, short 
minVersion, short maxVe
 .setMaxVersion(maxVersion)));
 }
 
+public NodeApiVersions(ApiVersionCollection nodeApiVersions, 
SupportedFeatureKeyCollection nodeSupportedFeatures) {
+for (ApiVersion nodeApiVersion : nodeApiVersions) {

Review Comment:
   Can we call `NodeApiVersions(ApiVersionCollection nodeApiVersions)` here to 
reduce some redundant code?



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -273,6 +301,40 @@ public static MetadataVersion latest() {
 return VALUES[VALUES.length - 1];
 }
 
+public Optional previous() {
+int idx = this.ordinal();
+if (idx > 2) {
+return Optional.of(MetadataVersion.values()[idx - 1]);
+} else {
+return Optional.empty();
+}
+}
+
+public static boolean checkIfMetadataChanged(MetadataVersion 
sourceVersion, MetadataVersion targetVersion) {

Review Comment:
   Just clarifying, this is to check if the metadata has changed in an 
incompatible way between the sourceVersion and targetVersion?



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -273,6 +301,40 @@ public static MetadataVersion latest() {
 return VALUES[VALUES.length - 1];
 }
 
+public Optional previous() {
+int idx = this.ordinal();
+if (idx > 2) {
+return Optional.of(MetadataVersion.values()[idx - 1]);

Review Comment:
   ```suggestion
   return Optional.of(VALUES[idx]);
   ```
   The indexing is a bit unintuitive, I wonder if you might just want to store 
all of `values()` (including `UNINITIALIZED`) in `VALUES` just to make this 
more clear?



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -273,6 +301,40 @@ public static MetadataVersion latest() {
 return VALUES[VALUES.length - 1];
 }
 
+public Optional previous() {
+int idx = this.ordinal();
+if (idx > 2) {
+return Optional.of(MetadataVersion.values()[idx - 1]);
+} else {
+return Optional.empty();
+}
+}
+
+public static boolean checkIfMetadataChanged(MetadataVersion 
sourceVersion, MetadataVersion targetVersion) {

Review Comment:
   Let's add a test for this



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -273,6 +301,40 @@ public static MetadataVersion latest() {
 return VALUES[VALUES.length - 1];
 }
 
+public Optional previous() {
+int idx = this.ordinal();

Review Comment:
   The indexing for `ordinal()` starts at `0` right? With 
`MetadataVersion.UNINITIALIZED` at index 0 (which I'm assuming we don't want to 
count as a valid previous version), should the conditional be `if (idx > 1)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] andymg3 commented on a diff in pull request #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager

2022-05-10 Thread GitBox


andymg3 commented on code in PR #12145:
URL: https://github.com/apache/kafka/pull/12145#discussion_r869680507


##
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java:
##
@@ -152,7 +154,11 @@ ControllerResult> 
deleteAcls(List filter
 results.add(new 
AclDeleteResult(ApiError.fromThrowable(e).exception()));
 }
 }
-return ControllerResult.atomicOf(records, results);
+return ControllerResult.atomicOf(dedupeApiMessageAndVersion(records), 
results);
+}
+
+private List 
dedupeApiMessageAndVersion(List messages) {
+return new HashSet<>(messages).stream().collect(Collectors.toList());

Review Comment:
   Good point. I think we could use a Set the whole way through except at the 
very end when we have to convert it to a List.
   
   Originally I was thinking about preserving the order of the messages as I 
did the de-dupe. I had the following at first:
   ```
   return new LinkedHashSet<>(messages).stream().collect(Collectors.toList());
   ```
   but then I realized the ordering shouldnt matter. So I think I can make the 
suggested change to just use a Set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13621) Resign leader on network partition

2022-05-10 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534578#comment-17534578
 ] 

Jose Armando Garcia Sancio commented on KAFKA-13621:


[~hachikuji] mentioned that after we approve and implement KIP-835 we are 
guaranteed for the leader to have record appended in every 
`metadata.monitor.write.interval.ms` (or 
`controller.monitor.write.interval.ms`). We can use this feature to resign the 
leader if written records don't commit after a fetch timeout.

I think this solution seems reasonable. I have the following questions:
 # If we should do this in the Controller/metadata module or in the raft module?
 # How do we handle quorums that are fetching but are slow to catch-up to the 
high-watermark because they have a small log?

> Resign leader on network partition
> --
>
> Key: KAFKA-13621
> URL: https://issues.apache.org/jira/browse/KAFKA-13621
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> h1. Motivation
> If the current leader A at epoch X gets partition from the rest of the 
> quorum, quorum voter A will stay leader at epoch X. This happens because 
> voter A will never receive an request from the rest of the voters increasing 
> the epoch. These requests that typically increase the epoch of past leaders 
> are BeginQuorumEpoch and Vote.
> In addition if voter A (leader at epoch X) doesn't get partition from the 
> rest of the brokers (observer in the KRaft protocol) the brokers will never 
> learn about the new quorum leader. This happens because 1. observers learn 
> about the leader from the Fetch response and 2. observer send a Fetch request 
> to a random leader if the Fetch request times out.
> Neither of these two scenarios will cause the broker to send a request to a 
> different voter because the leader at epoch X will never send a different 
> leader in the response and the broker will never send a Fetch request to a 
> different voter because the Fetch request will never timeout.
> h1. Proposed Changes
> In this scenario the A, the leader at epoch X, will stop receiving Fetch 
> request from the majority of the voters. Voter A should resign as leader if 
> the Fetch request from the majority of the voters is old enough. A reasonable 
> value for "old enough" is the Fetch timeout value.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading

2022-05-10 Thread GitBox


hachikuji commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r868498298


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -725,7 +727,11 @@ object KafkaConfig {
 
   /* Authorizer Configuration ***/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that 
implements ${classOf[Authorizer].getName}" +
-  " interface, which is used by the broker for authorization."
+" interface, which is used by the broker for authorization."
+  val EarlyStartListenersDoc = "A comma-separated list of listener names which 
should be started before any non-early start listeners. " +

Review Comment:
   nit: the first sentence seems circular. I think it would be helpful to 
mention the authorizer use case. Also, can we say something about the default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12062: KAFKA-13833: Remove the min_version_level from the finalized version written to ZooKeeper

2022-05-10 Thread GitBox


mumrah commented on code in PR #12062:
URL: https://github.com/apache/kafka/pull/12062#discussion_r869578975


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -86,15 +86,15 @@ class DefaultApiVersionManager(
 throttleTimeMs,
 interBrokerProtocolVersion.highestSupportedRecordVersion,
 supportedFeatures,
-finalizedFeatures.features,
+finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,

Review Comment:
   Is this just needed to convert the Scala short to Java short in the map?



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -148,7 +148,10 @@ public enum MetadataVersion {
 IBP_3_1_IV0(3, "3.1", "IV0"),
 
 // Support for leader recovery for unclean leader election (KIP-704)
-IBP_3_2_IV0(4, "3.2", "IV0");
+IBP_3_2_IV0(4, "3.2", "IV0"),
+
+// Removes min_version_level from the finalized version range that is 
written to ZooKeeper
+IBP_3_2_IV1(5, "3.2", "IV1");

Review Comment:
   Should this be 3.3-IV0 since we're on the 3.3 release now? (i.e., current 
build version is 3.3-SNAPSHOT)



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -148,7 +148,10 @@ public enum MetadataVersion {
 IBP_3_1_IV0(3, "3.1", "IV0"),
 
 // Support for leader recovery for unclean leader election (KIP-704)
-IBP_3_2_IV0(4, "3.2", "IV0");
+IBP_3_2_IV0(4, "3.2", "IV0"),
+
+// Removes min_version_level from the finalized version range that is 
written to ZooKeeper

Review Comment:
   Can you add a mention of the KIP here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12082: MINOR: Create case class to encapsulate fetch parameters and simplify handling

2022-05-10 Thread GitBox


hachikuji merged PR #12082:
URL: https://github.com/apache/kafka/pull/12082


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-10 Thread GitBox


cmccabe commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r869639379


##
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java:
##
@@ -233,4 +256,8 @@ public ApiVersion apiVersion(ApiKeys apiKey) {
 public Map allSupportedApiVersions() {
 return supportedVersions;
 }
+
+public Map supportedFeatures() {
+return Collections.unmodifiableMap(supportedFeatures);

Review Comment:
   It seems like `supportedFeatures` should be immutable and final in any case. 
There is no use-case for mutating this after the object is constructed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager

2022-05-10 Thread GitBox


hachikuji commented on code in PR #12145:
URL: https://github.com/apache/kafka/pull/12145#discussion_r869633002


##
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java:
##
@@ -152,7 +154,11 @@ ControllerResult> 
deleteAcls(List filter
 results.add(new 
AclDeleteResult(ApiError.fromThrowable(e).exception()));
 }
 }
-return ControllerResult.atomicOf(records, results);
+return ControllerResult.atomicOf(dedupeApiMessageAndVersion(records), 
results);
+}
+
+private List 
dedupeApiMessageAndVersion(List messages) {
+return new HashSet<>(messages).stream().collect(Collectors.toList());

Review Comment:
   nit: doesn't make a big difference, but any reason not to begin with a Set 
instead of converting from a List?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-10 Thread GitBox


mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r869631802


##
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##
@@ -55,25 +60,37 @@ public class FeatureControlManager {
  */
 private final TimelineHashMap finalizedVersions;
 
+/**
+ * The current metadata version
+ */
+private final TimelineObject metadataVersion;
+
+/**
+ * Collection of listeners for when features change
+ */
+private final Map listeners;
 
 FeatureControlManager(LogContext logContext,
   QuorumFeatures quorumFeatures,
   SnapshotRegistry snapshotRegistry) {
 this.log = logContext.logger(FeatureControlManager.class);
 this.quorumFeatures = quorumFeatures;
 this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
+this.metadataVersion = new TimelineObject<>(snapshotRegistry, 
MetadataVersion.UNINITIALIZED);

Review Comment:
   UNINITIALIZED was added since we need a non-null object to initialize the 
TimelineObject



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager

2022-05-10 Thread GitBox


cmccabe commented on PR #12145:
URL: https://github.com/apache/kafka/pull/12145#issuecomment-1122797307

   > I don't think we need to dedupe List. It contains a list 
of results, where each result contains the ACLs that matched the filter. It 
should be OK for the same ACL to be in multiple AclDeleteResult results because 
it really could match multiple filters.
   
   Agreed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-10 Thread GitBox


mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r869621801


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -63,6 +64,8 @@ class ControllerServer(
   val threadNamePrefix: Option[String],
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]],
   val configSchema: KafkaConfigSchema,
+  val raftApiVersions: ApiVersions,
+  val bootstrapMetadata: Option[BootstrapMetadata] = None

Review Comment:
   I still need to remove this Option and make it a regular argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12142: MINOR: install Exit.exit handler in BrokerMetadataPublisherTest

2022-05-10 Thread GitBox


cmccabe merged PR #12142:
URL: https://github.com/apache/kafka/pull/12142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13862) Add And Subtract multiple config values is not supported in KRaft mode

2022-05-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13862.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

> Add And Subtract multiple config values is not supported in KRaft mode
> --
>
> Key: KAFKA-13862
> URL: https://issues.apache.org/jira/browse/KAFKA-13862
> Project: Kafka
>  Issue Type: Bug
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
> Fix For: 3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji merged pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode

2022-05-10 Thread GitBox


hachikuji merged PR #12108:
URL: https://github.com/apache/kafka/pull/12108


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] andymg3 opened a new pull request, #12145: KAFKA-13892: Dedupe RemoveAccessControlEntryRecord in deleteAcls response of AclControlManager

2022-05-10 Thread GitBox


andymg3 opened a new pull request, #12145:
URL: https://github.com/apache/kafka/pull/12145

   ## JIRA
   https://issues.apache.org/jira/browse/KAFKA-13892
   
   ### Details
   In 
https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143
 we loop through the ACL filters and and add `RemoveAccessControlEntryRecord` 
records to the response list for each matching ACL. There's a bug where if two 
(or more) filters match the same ACL, we create two (or more) 
`RemoveAccessControlEntryRecord` records for the same ACL. This is an issue 
because upon replay we throw an exception 
(https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195)
 if the ACL is not in the in-memory data structures which will happen to the 
second `RemoveAccessControlEntryRecord`.
   
   I don't think we need to dedupe `List`. It contains a list 
of results, where each result contains the ACLs that matched the filter. It 
should be OK for the same ACL to be in multiple AclDeleteResult results because 
it really could match multiple filters.
   
   
   ### Testing
   - Added a unit test that tests the new behavior
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13892) Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager

2022-05-10 Thread Andrew Grant (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534543#comment-17534543
 ] 

Andrew Grant commented on KAFKA-13892:
--

I dont think we need to dedupe List. It contains a list of 
results, where each result contains the ACLs that matched the filter. It should 
be OK for the same ACL to be in multiple AclDeleteResult results because it 
really could match multiple filters.

> Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager
> 
>
> Key: KAFKA-13892
> URL: https://issues.apache.org/jira/browse/KAFKA-13892
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143]
>  we loop through the ACL filters and and add RemoveAccessControlEntryRecord 
> records to the response list for each matching ACL. I think there's a bug 
> where if two filters match the same ACL, we create two 
> RemoveAccessControlEntryRecord records for that same ACL. This is an issue 
> because upon replay we throw an exception 
> (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195)
>  if the ACL is not in the in-memory data structures which will happen to the 
> second RemoveAccessControlEntryRecord.
> Maybe we can just de-dupe both List and 
> List? I think something like (just showing code for 
> ApiMessageAndVersion):
> {code:java}
> private List 
> deDupeApiMessageAndVersion(List messages) {
> return new HashSet<>(messages).stream().collect(Collectors.toList());
> }{code}
> should suffice as I don't think the ordering matters within the list of 
> response objects.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13892) Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager

2022-05-10 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-13892:
-
Summary: Dedupe RemoveAccessControlEntryRecord in deleteAcls of 
AclControlManager  (was: Dedupe response objects in deleteAcls of 
AclControlManager )

> Dedupe RemoveAccessControlEntryRecord in deleteAcls of AclControlManager
> 
>
> Key: KAFKA-13892
> URL: https://issues.apache.org/jira/browse/KAFKA-13892
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143]
>  we loop through the ACL filters and and add RemoveAccessControlEntryRecord 
> records to the response list for each matching ACL. I think there's a bug 
> where if two filters match the same ACL, we create two 
> RemoveAccessControlEntryRecord records for that same ACL. This is an issue 
> because upon replay we throw an exception 
> (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195)
>  if the ACL is not in the in-memory data structures which will happen to the 
> second RemoveAccessControlEntryRecord.
> Maybe we can just de-dupe both List and 
> List? I think something like (just showing code for 
> ApiMessageAndVersion):
> {code:java}
> private List 
> deDupeApiMessageAndVersion(List messages) {
> return new HashSet<>(messages).stream().collect(Collectors.toList());
> }{code}
> should suffice as I don't think the ordering matters within the list of 
> response objects.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13892) Dedupe response objects in deleteAcls of AclControlManager

2022-05-10 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant reassigned KAFKA-13892:


Assignee: Andrew Grant

> Dedupe response objects in deleteAcls of AclControlManager 
> ---
>
> Key: KAFKA-13892
> URL: https://issues.apache.org/jira/browse/KAFKA-13892
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143]
>  we loop through the ACL filters and and add RemoveAccessControlEntryRecord 
> records to the response list for each matching ACL. I think there's a bug 
> where if two filters match the same ACL, we create two 
> RemoveAccessControlEntryRecord records for that same ACL. This is an issue 
> because upon replay we throw an exception 
> (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195)
>  if the ACL is not in the in-memory data structures which will happen to the 
> second RemoveAccessControlEntryRecord.
> Maybe we can just de-dupe both List and 
> List? I think something like (just showing code for 
> ApiMessageAndVersion):
> {code:java}
> private List 
> deDupeApiMessageAndVersion(List messages) {
> return new HashSet<>(messages).stream().collect(Collectors.toList());
> }{code}
> should suffice as I don't think the ordering matters within the list of 
> response objects.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ijuma opened a new pull request, #12144: MINOR: reload4j dependency fixes

2022-05-10 Thread GitBox


ijuma opened a new pull request, #12144:
URL: https://github.com/apache/kafka/pull/12144

   * Replace `log4j` with `reload4j` in `copyDependantLibs`
   * Exclude `log4j` and `slf4j-log4j12` transitive dependencies for
 'streams:upgrade-system-tests`. Versions 0100 and 0101
 had a transitive dependency to `log4j` and `slf4j-log4j12` via
 `zkclient` and `zookeeper`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13892) Dedupe response objects in deleteAcls of AclControlManager

2022-05-10 Thread Andrew Grant (Jira)
Andrew Grant created KAFKA-13892:


 Summary: Dedupe response objects in deleteAcls of 
AclControlManager 
 Key: KAFKA-13892
 URL: https://issues.apache.org/jira/browse/KAFKA-13892
 Project: Kafka
  Issue Type: Bug
Reporter: Andrew Grant


In 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L143]
 we loop through the ACL filters and and add RemoveAccessControlEntryRecord 
records to the response list for each matching ACL. I think there's a bug where 
if two filters match the same ACL, we create two RemoveAccessControlEntryRecord 
records for that same ACL. This is an issue because upon replay we throw an 
exception 
(https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L195)
 if the ACL is not in the in-memory data structures which will happen to the 
second RemoveAccessControlEntryRecord.

Maybe we can just de-dupe both List and 
List? I think something like (just showing code for 
ApiMessageAndVersion):
{code:java}
private List 
deDupeApiMessageAndVersion(List messages) {
return new HashSet<>(messages).stream().collect(Collectors.toList());
}{code}
should suffice as I don't think the ordering matters within the list of 
response objects.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mumrah commented on pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-10 Thread GitBox


mumrah commented on PR #12050:
URL: https://github.com/apache/kafka/pull/12050#issuecomment-1122497005

   @dengziming addressing your old comment:
   
   > Thank you for this PR @mumrah , I have a question: should we bump IBP from 
3.2-0 to 3.2-1, then in the future when migrating ZooKeeper clusters to KRaft, 
we can set metadataVersion=1 if the ibp is behind 3.2-0, and set 
metadataVersion=2 if ibp equals 3.2-1.
   
   With #12072, we have associated a `metadata.version` with each IBP starting 
with 3.0-IV0. In this PR, I've added 3.3-IV0 which will be metadata version 5. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-10 Thread GitBox


mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r869326805


##
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##
@@ -102,6 +106,8 @@ class KafkaRaftServer(
   threadNamePrefix,
   controllerQuorumVotersFuture,
   KafkaRaftServer.configSchema,
+  raftApiVersions,
+  Some(bootstrapMetadata)

Review Comment:
   That's an interesting point. For the broker, on first start, it will start 
up without a `metadata.version`. It won't be until the bootstrapped version is 
written by the controller and replicated to the broker that 
BrokerMetadataPublisher will update the FinalizedFeatureCache. 
   
   This is probably okay though, since brokers start up in a fenced state in 
KRaft anyways. Since the `metadata.version` bootstrapping is done in the leader 
election callback on the controller, we should have the FeatureLevelRecord 
written out before any broker registrations occur. I think this should avoid 
any issues with a broker becoming unfenced before knowing what the 
`metadata.version` is.
   
   Does that make sense? Or did I misunderstand your question :)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #12122: WIP: Upgrade tests for KAFKA-13769

2022-05-10 Thread GitBox


Ge commented on PR #12122:
URL: https://github.com/apache/kafka/pull/12122#issuecomment-1122378479

   @mjsax Can you please review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12140: KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS

2022-05-10 Thread GitBox


showuon commented on code in PR #12140:
URL: https://github.com/apache/kafka/pull/12140#discussion_r869204407


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -807,6 +807,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, 
false);

Review Comment:
   We might need to add a comment here to explain why we need to reset 
generation ID here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12143: MINOR: Update release versions for upgrade tests with ongoing release

2022-05-10 Thread GitBox


cadonna merged PR #12143:
URL: https://github.com/apache/kafka/pull/12143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on pull request #12140: KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS

2022-05-10 Thread GitBox


aiquestion commented on PR #12140:
URL: https://github.com/apache/kafka/pull/12140#issuecomment-1122343498

   @[showuon](https://github.com/showuon) can you please help to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi closed pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap

2022-05-10 Thread GitBox


viktorsomogyi closed pull request #9519: KAFKA-10650: Use Murmur3 instead of 
MD5 in SkimpyOffsetMap
URL: https://github.com/apache/kafka/pull/9519


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap

2022-05-10 Thread GitBox


viktorsomogyi commented on PR #9519:
URL: https://github.com/apache/kafka/pull/9519#issuecomment-1122331820

   Closing this due to inactivity from reviewers' part.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on pull request #12140: KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS

2022-05-10 Thread GitBox


aiquestion commented on PR #12140:
URL: https://github.com/apache/kafka/pull/12140#issuecomment-1122315793

   created a jira for it: https://issues.apache.org/jira/browse/KAFKA-13891


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2022-05-10 Thread Shawn Wang (Jira)
Shawn Wang created KAFKA-13891:
--

 Summary: sync group failed with rebalanceInProgress error cause 
rebalance many rounds in coopeartive
 Key: KAFKA-13891
 URL: https://issues.apache.org/jira/browse/KAFKA-13891
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.0.0
Reporter: Shawn Wang


This issue was first found in 
[KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]

But the previous PR forgot to reset generation when sync group failed with 
rebalanceInProgress error. So the previous bug still exists and it may cause 
consumer to rebalance many rounds before final stable.

Here's the example ({*}bold is added{*}):
 # consumer A joined and synced group successfully with generation 1 *( with 
ownedPartition P1/P2 )*
 # New rebalance started with generation 2, consumer A joined successfully, but 
somehow, consumer A doesn't send out sync group immediately
 # other consumer completed sync group successfully in generation 2, except 
consumer A.
 # After consumer A send out sync group, the new rebalance start, with 
generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
response
 # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with generation 
3, with the assignment (ownedPartition) in generation 1.
 # So, now, we have out-of-date ownedPartition sent, with unexpected results 
happened
 # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
ownedPartition is ignored because of old generation.*
 # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
 # *if some other consumer C failed to syncGroup before consumer A's joinGroup. 
the same issue will happens again and result in many rounds of rebalance before 
stable*

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] rajinisivaram merged pull request #12131: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases

2022-05-10 Thread GitBox


rajinisivaram merged PR #12131:
URL: https://github.com/apache/kafka/pull/12131


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #12131: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases

2022-05-10 Thread GitBox


rajinisivaram commented on PR #12131:
URL: https://github.com/apache/kafka/pull/12131#issuecomment-1122212951

   @chernyih Thanks for the PR, LGTM. Seems like a reasonable change since we 
are just ensuring that backoff is applied unless ApiVersions response is 
received. Since we were applying exponential backoff for authentication 
failures, delaying to ApiVersions response seems ok. Merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12143: MINOR: Update release versions for upgrade tests with ongoing release

2022-05-10 Thread GitBox


cadonna opened a new pull request, #12143:
URL: https://github.com/apache/kafka/pull/12143

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sayantanu-dey commented on pull request #12035: KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-05-10 Thread GitBox


sayantanu-dey commented on PR #12035:
URL: https://github.com/apache/kafka/pull/12035#issuecomment-1122126444

   @guozhangwang @dajac sorry for being late, I went through the whole 
conversation and I have pushed the changes suggested.
   1. Now I am returning the `close` status after resolving the future returned 
by `adminClient#removeMembersFromConsumerGroup`
   2. Now I have also added `membersToRemove` in the constructor 
`RemoveMembersFromConsumerGroupOptions`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12113: MINOR: Small cleanups in connect/mirror

2022-05-10 Thread GitBox


showuon commented on PR #12113:
URL: https://github.com/apache/kafka/pull/12113#issuecomment-1121995342

   Failed tests are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopology
   Build / JDK 8 and Scala 2.12 / 
kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
   Build / JDK 11 and Scala 2.13 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12113: MINOR: Small cleanups in connect/mirror

2022-05-10 Thread GitBox


showuon merged PR #12113:
URL: https://github.com/apache/kafka/pull/12113


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org