[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-17 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r634023546



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+
+public final class TopicBasedRemoteLogMetadataManagerConfig {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfig.class.getName());
+
+public static final String REMOTE_LOG_METADATA_TOPIC_NAME = 
"__remote_log_metadata";
+
+public static final String 
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = 
"remote.log.metadata.topic.replication.factor";
+public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = 
"remote.log.metadata.topic.num.partitions";
+public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_PROP 
= "remote.log.metadata.topic.retention.ms";
+public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = 
"remote.log.metadata.publish.wait.ms";
+
+public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
+public static final long 
DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS = -1L;
+public static final int 
DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
+public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 60 
* 1000L;
+
+public static final String 
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor of 
remote log metadata Topic.";
+public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC = "The 
number of partitions for remote log metadata Topic.";
+public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_DOC 
= "Remote log metadata topic log retention in milli seconds." +
+"Default: -1, that means unlimited. Users can configure this value 
based on their use cases. " +
+"To avoid any data loss, this value should be more than the 
maximum retention period of any topic enabled with " +
+"tiered storage in the cluster.";
+public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The 
amount of time in milli seconds to wait for the local consumer to " +
+"receive the published event.";
+
+public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX = 
"remote.log.metadata.common.client.";
+public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = 
"remote.log.metadata.producer.";
+public static final String REMOTE_LOG_METADATA_CONSUMER_PREFIX = 
"remote.log.metadata.consumer.";
+
+private static final String REMOTE_LOG_METADATA_CLIENT_PREFIX = 
"__remote_log_metadata_client";
+private static final String BROKER_ID = "broker.id";
+
+private static final ConfigDef CONFIG = new ConfigDef();
+static {
+CONFIG.define(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, INT, 
DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR, atLeast(1), LOW,

Review comment:
   `Config` definition will be moved to `KafkaConfig` later when default 
RLMM is integrated with the broker. 




-- 
This is an automated message from the Apache 

[GitHub] [kafka] vamossagar12 commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

2021-05-17 Thread GitBox


vamossagar12 commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-842807268


   thanks @ableegoldman , have updated the main KIP page, the streams kiP sub 
page and the actual kiP. Thanks for your support!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.

2021-05-17 Thread Ki Lok Wong (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346566#comment-17346566
 ] 

Ki Lok Wong commented on KAFKA-12804:
-

Yep, I will do it later this arvo.  Cheers.

> In-Memory state store backed by InMemoryKeyValueStore is slow due to 
> synchronizing reads/writes using a single lock.
> 
>
> Key: KAFKA-12804
> URL: https://issues.apache.org/jira/browse/KAFKA-12804
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Ki Lok Wong
>Assignee: Ki Lok Wong
>Priority: Minor
>  Labels: performance, pull-request-available
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the 
> backing map implementation.  Read/write methods such as get() and put() are 
> synchronized on the same lock hence significantly reducing multi-thread 
> performance.
> https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10715: KAFKA-9295: increase heartbeat and session timeout

2021-05-17 Thread GitBox


showuon commented on pull request #10715:
URL: https://github.com/apache/kafka/pull/10715#issuecomment-842799201


   @ableegoldman , as discussed, could you help review this PR to increase 
session timeout? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.

2021-05-17 Thread Ki Lok Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ki Lok Wong reassigned KAFKA-12804:
---

Assignee: Ki Lok Wong

> In-Memory state store backed by InMemoryKeyValueStore is slow due to 
> synchronizing reads/writes using a single lock.
> 
>
> Key: KAFKA-12804
> URL: https://issues.apache.org/jira/browse/KAFKA-12804
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Ki Lok Wong
>Assignee: Ki Lok Wong
>Priority: Minor
>  Labels: performance, pull-request-available
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the 
> backing map implementation.  Read/write methods such as get() and put() are 
> synchronized on the same lock hence significantly reducing multi-thread 
> performance.
> https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.

2021-05-17 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346554#comment-17346554
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12804:


Hey [~gorsonw] thanks for the ticket, this class certainly has not been 
optimized much (or at all). I see you applied the "pull request available" 
label to this, can you include the link to your PR in the ticket and assign it 
to yourself if you plan to work on this?

> In-Memory state store backed by InMemoryKeyValueStore is slow due to 
> synchronizing reads/writes using a single lock.
> 
>
> Key: KAFKA-12804
> URL: https://issues.apache.org/jira/browse/KAFKA-12804
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Ki Lok Wong
>Priority: Minor
>  Labels: performance, pull-request-available
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the 
> backing map implementation.  Read/write methods such as get() and put() are 
> synchronized on the same lock hence significantly reducing multi-thread 
> performance.
> https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon opened a new pull request #10715: KAFKA-9295: increase heartbeat and session timeout

2021-05-17 Thread GitBox


showuon opened a new pull request #10715:
URL: https://github.com/apache/kafka/pull/10715


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.

2021-05-17 Thread Ki Lok Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ki Lok Wong updated KAFKA-12804:

Remaining Estimate: 2h  (was: 2m)
 Original Estimate: 2h  (was: 2m)

> In-Memory state store backed by InMemoryKeyValueStore is slow due to 
> synchronizing reads/writes using a single lock.
> 
>
> Key: KAFKA-12804
> URL: https://issues.apache.org/jira/browse/KAFKA-12804
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Ki Lok Wong
>Priority: Minor
>  Labels: performance, pull-request-available
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the 
> backing map implementation.  Read/write methods such as get() and put() are 
> synchronized on the same lock hence significantly reducing multi-thread 
> performance.
> https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12804) In-Memory state store backed by InMemoryKeyValueStore is slow due to synchronizing reads/writes using a single lock.

2021-05-17 Thread Ki Lok Wong (Jira)
Ki Lok Wong created KAFKA-12804:
---

 Summary: In-Memory state store backed by InMemoryKeyValueStore is 
slow due to synchronizing reads/writes using a single lock.
 Key: KAFKA-12804
 URL: https://issues.apache.org/jira/browse/KAFKA-12804
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.6.2
Reporter: Ki Lok Wong


Default In-Memory state store (InMemoryKeyValueStore) uses TreeMap as the 
backing map implementation.  Read/write methods such as get() and put() are 
synchronized on the same lock hence significantly reducing multi-thread 
performance.

https://github.com/a0x8o/kafka/blob/f834be5e73e46805b29df9845bf09be98f91fc09/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L78



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10471: KAFKA-12597: remove deprecated zookeeper option in ReassignPartitionsCommand

2021-05-17 Thread GitBox


showuon commented on pull request #10471:
URL: https://github.com/apache/kafka/pull/10471#issuecomment-842772724


   @ijuma , could you take a look again for this PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10665: KAFKA-9009: increase replica.lag.time.max.ms to make the test reliable

2021-05-17 Thread GitBox


showuon commented on pull request #10665:
URL: https://github.com/apache/kafka/pull/10665#issuecomment-842772461


   @edoardocomar @mimaison  , could you help review this PR to make the test 
reliable. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9311: KAFKA-9910: Implement new transaction timed out error

2021-05-17 Thread GitBox


abbccdda commented on pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#issuecomment-842758541


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633958458



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -936,21 +944,13 @@ private QuorumController(LogContext logContext,
 this.replicationControl = new 
ReplicationControlManager(snapshotRegistry,
 logContext, defaultReplicationFactor, defaultNumPartitions,
 configurationControl, clusterControl, controllerMetrics);
-this.logManager = logManager;
+this.raftClient = raftClient;
 this.metaLogListener = new QuorumMetaLogListener();
-this.curClaimEpoch = -1L;
-this.lastCommittedOffset = snapshotReader.epoch();
+this.curClaimEpoch = -1;
+this.lastCommittedOffset = -1L;
 this.writeOffset = -1L;
 
-while (snapshotReader.hasNext()) {
-List batch = snapshotReader.next();
-long index = 0;
-for (ApiMessage message : batch) {
-replay(message, snapshotReader.epoch(), index++);
-}
-}
-snapshotRegistry.createSnapshot(lastCommittedOffset);

Review comment:
   Okay. I didn't want do this as part of the merge commit but I'll add it 
back as a separate commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633957907



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##
@@ -234,22 +259,20 @@ class BrokerMetadataListener(brokerId: Int,
 clientQuotaManager.handleQuotaRecord(record)
   }
 
-  class HandleNewLeaderEvent(leader: MetaLogLeader)
+  class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
   extends EventQueue.FailureLoggingEvent(log) {
 override def run(): Unit = {
   val imageBuilder =
 MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
-  if (leader.nodeId() < 0) {
-imageBuilder.controllerId(None)
-  } else {
-imageBuilder.controllerId(Some(leader.nodeId()))
-  }
+  imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala)
   metadataCache.image(imageBuilder.build())
 }
   }
 
-  override def handleNewLeader(leader: MetaLogLeader): Unit = {
-eventQueue.append(new HandleNewLeaderEvent(leader))
+  override def handleLeaderChange(leader: LeaderAndEpoch): Unit = {
+if (leader.isLeader(brokerId)) {
+  eventQueue.append(new HandleNewLeaderEvent(leader))

Review comment:
   Yeah, I think you are correct. We should always send leadership changes 
to the broker handlers and update the cache accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633956946



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
##
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
 this.epoch = epoch;
 }
 

Review comment:
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633956837



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -57,4 +60,9 @@
  */
 @Override
 void close();
+
+static  BatchReader singleton(Batch batch) {

Review comment:
   I removed this method. I think it is a dangerous method to have in 
`BatchReader` since batch readers created this way do not integrate with the 
rest of the `KafkaRaftClient`. This method was only used for tests so I just 
modified those cases to use `MemoryBatchReader` directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633956073



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -202,7 +200,7 @@ public Builder setMetrics(ControllerMetrics 
controllerMetrics) {
 
 @SuppressWarnings("unchecked")
 public QuorumController build() throws Exception {
-if (logManager == null) {
+if (raftClient == null) {
 throw new RuntimeException("You must set a metadata log 
manager.");

Review comment:
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633956010



##
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -218,7 +226,7 @@ synchronized void electLeaderIfNeeded() {
 /**
  * The node ID of this local log manager. Each log manager must have a 
unique ID.
  */
-private final int nodeId;
+public final int nodeId;

Review comment:
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633940297



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2261,6 +2260,11 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 return shutdownComplete;
 }
 
+@Override
+public void resign(int epoch) {
+throw new UnsupportedOperationException();

Review comment:
   Yes. We have an issue for this: 
https://issues.apache.org/jira/browse/KAFKA-12631




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12803) Support reassigning partitions when in KRaft mode

2021-05-17 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12803:


 Summary: Support reassigning partitions when in KRaft mode
 Key: KAFKA-12803
 URL: https://issues.apache.org/jira/browse/KAFKA-12803
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 2.8.0
Reporter: Colin McCabe
Assignee: Colin McCabe
 Fix For: 3.0.0


Support reassigning partitions when in KRaft mode



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12788) Improve KRaft replica placement

2021-05-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12788:
-
Affects Version/s: 2.8.0

> Improve KRaft replica placement
> ---
>
> Key: KAFKA-12788
> URL: https://issues.apache.org/jira/browse/KAFKA-12788
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Implement the existing Kafka replica placement algorithm for KRaft.
> This also means implementing rack awareness. Previously, we just chose
> replicas randomly in a non-rack-aware fashion. Also, allow replicas to
> be placed on fenced brokers if there are no other choices. This was
> specified in KIP-631 but previously not implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12788) Improve KRaft replica placement

2021-05-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12788.
--
Fix Version/s: 3.0.0
 Reviewer: Jun Rao
   Resolution: Fixed

> Improve KRaft replica placement
> ---
>
> Key: KAFKA-12788
> URL: https://issues.apache.org/jira/browse/KAFKA-12788
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Implement the existing Kafka replica placement algorithm for KRaft.
> This also means implementing rack awareness. Previously, we just chose
> replicas randomly in a non-rack-aware fashion. Also, allow replicas to
> be placed on fenced brokers if there are no other choices. This was
> specified in KIP-631 but previously not implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-17 Thread GitBox


cmccabe merged pull request #10494:
URL: https://github.com/apache/kafka/pull/10494


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633935466



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -56,24 +57,17 @@
 void handleSnapshot(SnapshotReader reader);
 
 /**

Review comment:
   > Or you could choose to burn an epoch like this...
   
   This is not how leader election works in Raft. When a leader fails or steps 
down an epoch starts without a leader, only candidate(s). If leader election 
succeeds for a given epoch only one leader is guaranteed to be elected for that 
epoch and will remain leader for the duration of that epoch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633935466



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -56,24 +57,17 @@
 void handleSnapshot(SnapshotReader reader);
 
 /**

Review comment:
   > Or you could choose to burn an epoch like this...
   
   This is not how leader election works in Raft. When a leader fails or steps 
down an epoch starts without a leader, only candidate(s). If leader election 
succeeds for a given epoch only one leader is guaranteed to elect for that 
epoch and will remain leader for the duration of that epoch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633930529



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -56,24 +57,17 @@
 void handleSnapshot(SnapshotReader reader);
 
 /**

Review comment:
   I'll update the documentation but in Raft epochs are not guarantee to 
have a leader. If there is a leader for an epoch then there is one and only one 
leader. So that means that the client could see.
   ```
   handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
   handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1))
   handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1))
   ```
   Or this for that matter
   ```
   handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
   handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1))
   handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=2))
   handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=2))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r633929510



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+/**
+ * Test that the BrokerList class works as expected.
+ */
+@Test
+public void testBrokerList() {
+assertEquals(0, BrokerList.EMPTY.size());
+assertEquals(-1, BrokerList.EMPTY.next(1));
+BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+assertEquals(4, brokers.size());
+assertEquals(0, brokers.next(0));
+assertEquals(1, brokers.next(0));
+assertEquals(2, brokers.next(0));
+assertEquals(3, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(1, brokers.next(1));
+assertEquals(2, brokers.next(1));
+assertEquals(3, brokers.next(1));
+assertEquals(0, brokers.next(1));
+assertEquals(-1, brokers.next(1));
+}
+
+/**
+ * Test that we perform striped replica placement as expected, and don't 
use the
+ * fenced replica if we don't have to.
+ */
+@Test
+public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(0, Optional.empty(), false),
+new UsableBroker(4, Optional.empty(), false),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(5, rackList.numTotalBrokers());
+assertEquals(4, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(0));
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(-1));
+assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+}
+
+/**
+ * Test that we will place on the fenced replica if we need to.
+ */
+@Test
+public void testPlacementOnFencedReplicaOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(3, rackList.numTotalBrokers());
+assertEquals(2, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+}
+
+@Test
+public void 

[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r633926611



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, 
these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot 
be
+ * satisfied.  The first constraint is that we can't place more than one 
replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for 
example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint 
comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an 
unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to 
create
+ * new topics even if every broker were fenced.  However, this would be 
confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack 
objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted 
list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, 
stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin 
fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with 
replication
+ * factor 3, our first replica might come from A, our second from B, and our 
third from C.
+ * Of course our placement would not be very fair if we always started with 
rack A.
+ * Therefore, we generate a random starting offset when the RackList is 
created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a 
rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica 
A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, 
and so on.
+ * Just like with the racks, we add a random 

[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r633926611



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, 
these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot 
be
+ * satisfied.  The first constraint is that we can't place more than one 
replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for 
example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint 
comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an 
unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to 
create
+ * new topics even if every broker were fenced.  However, this would be 
confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack 
objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted 
list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, 
stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin 
fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with 
replication
+ * factor 3, our first replica might come from A, our second from B, and our 
third from C.
+ * Of course our placement would not be very fair if we always started with 
rack A.
+ * Therefore, we generate a random starting offset when the RackList is 
created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a 
rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica 
A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, 
and so on.
+ * Just like with the racks, we add a random 

[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633909171



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -209,136 +196,6 @@ public void testUnregisterBroker() throws Throwable {
 }
 }
 
-static class MockSnapshotWriterBuilder implements Function {

Review comment:
   Well, as I said in the other comments... I'd really like to find a way 
to keep this test working before we merge this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633900447



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##
@@ -234,22 +259,20 @@ class BrokerMetadataListener(brokerId: Int,
 clientQuotaManager.handleQuotaRecord(record)
   }
 
-  class HandleNewLeaderEvent(leader: MetaLogLeader)
+  class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
   extends EventQueue.FailureLoggingEvent(log) {
 override def run(): Unit = {
   val imageBuilder =
 MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
-  if (leader.nodeId() < 0) {
-imageBuilder.controllerId(None)
-  } else {
-imageBuilder.controllerId(Some(leader.nodeId()))
-  }
+  imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala)
   metadataCache.image(imageBuilder.build())
 }
   }
 
-  override def handleNewLeader(leader: MetaLogLeader): Unit = {
-eventQueue.append(new HandleNewLeaderEvent(leader))
+  override def handleLeaderChange(leader: LeaderAndEpoch): Unit = {
+if (leader.isLeader(brokerId)) {
+  eventQueue.append(new HandleNewLeaderEvent(leader))

Review comment:
   This doesn't seem correct We need to know the controller ID even if 
it's not the same as our broker ID.
   
   (It's not even clear that we will always have a broker with the same ID as 
any controller, since their ID space is separate.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633918245



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -936,21 +944,13 @@ private QuorumController(LogContext logContext,
 this.replicationControl = new 
ReplicationControlManager(snapshotRegistry,
 logContext, defaultReplicationFactor, defaultNumPartitions,
 configurationControl, clusterControl, controllerMetrics);
-this.logManager = logManager;
+this.raftClient = raftClient;
 this.metaLogListener = new QuorumMetaLogListener();
-this.curClaimEpoch = -1L;
-this.lastCommittedOffset = snapshotReader.epoch();
+this.curClaimEpoch = -1;
+this.lastCommittedOffset = -1L;
 this.writeOffset = -1L;
 
-while (snapshotReader.hasNext()) {
-List batch = snapshotReader.next();
-long index = 0;
-for (ApiMessage message : batch) {
-replay(message, snapshotReader.epoch(), index++);
-}
-}
-snapshotRegistry.createSnapshot(lastCommittedOffset);

Review comment:
   Hmm... I would rather not remove this code if possible. In another 
comment I suggested moving it to the snapshot load function that you also 
added. That would also allow the associated test to keep working.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12802) Add a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.

2021-05-17 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-12802:
--

 Summary: Add a file based cache for consumed remote log metadata 
for each partition to avoid consuming again incase of broker restarts.
 Key: KAFKA-12802
 URL: https://issues.apache.org/jira/browse/KAFKA-12802
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Satish Duggana
 Fix For: 3.0.0


Add a file based cache for consumed remote log metadata for each partition to 
avoid consuming again in case of broker restarts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9555) Topic-based implementation for the RemoteLogMetadataManager

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-9555:
--
Description: 
The purpose of this task is to implement a {{RemoteLogMetadataManager}} based 
on an internal topic in Kafka. More details ar mentioned in the 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-RemoteLogMetadataManagerimplementedwithaninternaltopic].

Done means:
 - Pull Request available for review and unit-tests.

System and integration tests are out of scope of this task and will be part of 
another task.

 

  was:
The purpose of this task is to implement a {{RemoteLogMetadataManager}} based 
on an internal topic in Kafka.

Done means:

- Pull Request available for review and unit-tests.

System and integration tests are out of scope of this task and will be part of 
another task.


> Topic-based implementation for the RemoteLogMetadataManager
> ---
>
> Key: KAFKA-9555
> URL: https://issues.apache.org/jira/browse/KAFKA-9555
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>
> The purpose of this task is to implement a {{RemoteLogMetadataManager}} based 
> on an internal topic in Kafka. More details ar mentioned in the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-RemoteLogMetadataManagerimplementedwithaninternaltopic].
> Done means:
>  - Pull Request available for review and unit-tests.
> System and integration tests are out of scope of this task and will be part 
> of another task.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633917057



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -56,24 +57,17 @@
 void handleSnapshot(SnapshotReader reader);
 
 /**

Review comment:
   It's fine to combine these APIs, but we need to document what happens if 
the current leader resigns, but we don't know who the new leader is yet. Do we 
get passed a LeaderAndEpoch with the current epoch + 1 and a node ID of -1? If 
so, do we then expect to see another LeaderAndEpoch with the current epoch + 1 
and a valid node -1?
   
   In other words, let's say node 0 is the leader and then resigns, and then 
node 1 becomes the leader. Does it look like this:
   ```
   handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
   handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1))
   handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1))
   ```
   
   Or would you rather have something like this?
   ```
   handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
   handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=0))
   handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=1))
   ```
   
   It seems like the second one will break a lot of invariants, so probably 
should be avoided. The first one might break some invariants too, though. We'd 
have to look.
   
   Or you could choose to burn an epoch like this:
   ```
   handleLeaderChange(LeaderAndEpoch(nodeId=0, epoch=0))
   handleLeaderChange(LeaderAndEpoch(nodeId=-1, epoch=1))
   handleLeaderChange(LeaderAndEpoch(nodeId=1, epoch=2))
   ```
   Given that we only have a 31-bit epoch in the first place, that seems 
unwise, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633915001



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2261,6 +2260,11 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 return shutdownComplete;
 }
 
+@Override
+public void resign(int epoch) {
+throw new UnsupportedOperationException();

Review comment:
   This needs to be supported, because the controller will resign if it 
detects certain bugs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633914096



##
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -331,22 +343,40 @@ public void beginShutdown() {
 }
 
 @Override
-public void close() throws InterruptedException {
+public void close() {
 log.debug("Node {}: closing.", nodeId);
 beginShutdown();
-eventQueue.close();
+
+try {
+eventQueue.close();
+} catch (InterruptedException e) {
+Thread.currentThread().interrupt();
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public CompletableFuture shutdown(int timeoutMs) {

Review comment:
   It seems like the intention behind this `shutdown` API is to be 
non-blocking, but this implementation is not non-blocking. Maybe it would be 
good to add a comment about this?
   
   In general it is not possible to do a non-blocking thread join unless you 
have a third thread (not the calling thread, not the thread being shut down) 
which can wait for the blocking thread join operation to complete and then 
complete a future (or whatever).
   
   That's why there are two shutdown APIs in LocalLogManager: a non-blocking 
beginShutdown and a blocking close which does all that, plus the thread join. 
This is a pattern that I use in other places as well. I think it's more useful 
than returning a future from close, due to the problem I mentioned above. It 
could be worth considering for RaftClient in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633912779



##
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -218,7 +226,7 @@ synchronized void electLeaderIfNeeded() {
 /**
  * The node ID of this local log manager. Each log manager must have a 
unique ID.
  */
-private final int nodeId;
+public final int nodeId;

Review comment:
   Data fields should not be public




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633910968



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -624,50 +617,71 @@ public String toString() {
 return event.future();
 }
 
-class QuorumMetaLogListener implements MetaLogListener {
+class QuorumMetaLogListener implements 
RaftClient.Listener {
+
 @Override
-public void handleCommits(long offset, List messages) {
-appendControlEvent("handleCommits[" + offset + "]", () -> {
-if (curClaimEpoch == -1) {
-// If the controller is a standby, replay the records that 
were
-// created by the active controller.
-if (log.isDebugEnabled()) {
-if (log.isTraceEnabled()) {
-log.trace("Replaying commits from the active node 
up to " +
-"offset {}: {}.", offset, messages.stream().
-map(m -> 
m.toString()).collect(Collectors.joining(", ")));
+public void handleCommit(BatchReader reader) {
+appendControlEvent("handleCommits[baseOffset=" + 
reader.baseOffset() + "]", () -> {
+try {
+boolean isActiveController = curClaimEpoch != -1;
+while (reader.hasNext()) {
+Batch batch = reader.next();
+long offset = batch.lastOffset();
+List messages = batch.records();
+
+if (isActiveController) {
+// If the controller is active, the records were 
already replayed,
+// so we don't need to do it here.
+log.debug("Completing purgatory items up to offset 
{}.", offset);
+
+// Complete any events in the purgatory that were 
waiting for this offset.
+purgatory.completeUpTo(offset);
+
+// Delete all the in-memory snapshots that we no 
longer need.
+// If we are writing a new snapshot, then we need 
to keep that around;
+// otherwise, we should delete up to the current 
committed offset.
+snapshotRegistry.deleteSnapshotsUpTo(
+Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+
 } else {
-log.debug("Replaying commits from the active node 
up to " +
-"offset {}.", offset);
+// If the controller is a standby, replay the 
records that were
+// created by the active controller.
+if (log.isDebugEnabled()) {
+if (log.isTraceEnabled()) {
+log.trace("Replaying commits from the 
active node up to " +
+"offset {}: {}.", offset, 
messages.stream()
+.map(ApiMessageAndVersion::toString)
+.collect(Collectors.joining(", ")));
+} else {
+log.debug("Replaying commits from the 
active node up to " +
+"offset {}.", offset);
+}
+}
+for (ApiMessageAndVersion messageAndVersion : 
messages) {
+replay(messageAndVersion.message(), -1, 
offset);
+}
 }
+lastCommittedOffset = offset;
 }
-for (ApiMessage message : messages) {
-replay(message, -1, offset);
-}
-} else {
-// If the controller is active, the records were already 
replayed,
-// so we don't need to do it here.
-log.debug("Completing purgatory items up to offset {}.", 
offset);
-
-// Complete any events in the purgatory that were waiting 
for this offset.
-purgatory.completeUpTo(offset);
-
-// Delete all the in-memory snapshots that we no longer 
need.
-// If we are writing a new snapshot, then we need to keep 
that around;
-// otherwise, we should delete up to the current committed 
offset.
-snapshotRegistry.deleteSnapshotsUpTo(
-Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+} finally {
+reader.close();
 }
-   

[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633910968



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -624,50 +617,71 @@ public String toString() {
 return event.future();
 }
 
-class QuorumMetaLogListener implements MetaLogListener {
+class QuorumMetaLogListener implements 
RaftClient.Listener {
+
 @Override
-public void handleCommits(long offset, List messages) {
-appendControlEvent("handleCommits[" + offset + "]", () -> {
-if (curClaimEpoch == -1) {
-// If the controller is a standby, replay the records that 
were
-// created by the active controller.
-if (log.isDebugEnabled()) {
-if (log.isTraceEnabled()) {
-log.trace("Replaying commits from the active node 
up to " +
-"offset {}: {}.", offset, messages.stream().
-map(m -> 
m.toString()).collect(Collectors.joining(", ")));
+public void handleCommit(BatchReader reader) {
+appendControlEvent("handleCommits[baseOffset=" + 
reader.baseOffset() + "]", () -> {
+try {
+boolean isActiveController = curClaimEpoch != -1;
+while (reader.hasNext()) {
+Batch batch = reader.next();
+long offset = batch.lastOffset();
+List messages = batch.records();
+
+if (isActiveController) {
+// If the controller is active, the records were 
already replayed,
+// so we don't need to do it here.
+log.debug("Completing purgatory items up to offset 
{}.", offset);
+
+// Complete any events in the purgatory that were 
waiting for this offset.
+purgatory.completeUpTo(offset);
+
+// Delete all the in-memory snapshots that we no 
longer need.
+// If we are writing a new snapshot, then we need 
to keep that around;
+// otherwise, we should delete up to the current 
committed offset.
+snapshotRegistry.deleteSnapshotsUpTo(
+Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+
 } else {
-log.debug("Replaying commits from the active node 
up to " +
-"offset {}.", offset);
+// If the controller is a standby, replay the 
records that were
+// created by the active controller.
+if (log.isDebugEnabled()) {
+if (log.isTraceEnabled()) {
+log.trace("Replaying commits from the 
active node up to " +
+"offset {}: {}.", offset, 
messages.stream()
+.map(ApiMessageAndVersion::toString)
+.collect(Collectors.joining(", ")));
+} else {
+log.debug("Replaying commits from the 
active node up to " +
+"offset {}.", offset);
+}
+}
+for (ApiMessageAndVersion messageAndVersion : 
messages) {
+replay(messageAndVersion.message(), -1, 
offset);
+}
 }
+lastCommittedOffset = offset;
 }
-for (ApiMessage message : messages) {
-replay(message, -1, offset);
-}
-} else {
-// If the controller is active, the records were already 
replayed,
-// so we don't need to do it here.
-log.debug("Completing purgatory items up to offset {}.", 
offset);
-
-// Complete any events in the purgatory that were waiting 
for this offset.
-purgatory.completeUpTo(offset);
-
-// Delete all the in-memory snapshots that we no longer 
need.
-// If we are writing a new snapshot, then we need to keep 
that around;
-// otherwise, we should delete up to the current committed 
offset.
-snapshotRegistry.deleteSnapshotsUpTo(
-Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+} finally {
+reader.close();
 }
-   

[jira] [Assigned] (KAFKA-9578) Kafka Tiered Storage - System Tests

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-9578:
-

Assignee: Kamal Chandraprakash  (was: Alexandre Dupriez)

> Kafka Tiered Storage - System  Tests
> 
>
> Key: KAFKA-9578
> URL: https://issues.apache.org/jira/browse/KAFKA-9578
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Harsha
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> Initial test cases set up by [~Ying Zheng] 
>  
> [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9579) RLM fetch implementation by adding respective purgatory

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-9579:
--
Fix Version/s: 3.0.0

> RLM fetch implementation by adding respective purgatory
> ---
>
> Key: KAFKA-9579
> URL: https://issues.apache.org/jira/browse/KAFKA-9579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Ying Zheng
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9578) Kafka Tiered Storage - System Tests

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-9578:
--
Fix Version/s: 3.0.0

> Kafka Tiered Storage - System  Tests
> 
>
> Key: KAFKA-9578
> URL: https://issues.apache.org/jira/browse/KAFKA-9578
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Harsha
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.0.0
>
>
> Initial test cases set up by [~Ying Zheng] 
>  
> [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9990) Supporting transactions in tiered storage

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-9990:
--
Fix Version/s: 3.0.0

> Supporting transactions in tiered storage
> -
>
> Key: KAFKA-9990
> URL: https://issues.apache.org/jira/browse/KAFKA-9990
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7739) Kafka Tiered Storage

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-7739:
-

Assignee: Satish Duggana  (was: Harsha)

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12641) Clear RemoteLogLeaderEpochState entry when it become empty.

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-12641:
---
Fix Version/s: 3.0.0

> Clear RemoteLogLeaderEpochState entry when it become empty. 
> 
>
> Key: KAFKA-12641
> URL: https://issues.apache.org/jira/browse/KAFKA-12641
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>
> https://github.com/apache/kafka/pull/10218#discussion_r609895193



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9555) Topic-based implementation for the RemoteLogMetadataManager

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-9555:
--
Fix Version/s: 3.0.0

> Topic-based implementation for the RemoteLogMetadataManager
> ---
>
> Key: KAFKA-9555
> URL: https://issues.apache.org/jira/browse/KAFKA-9555
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>
> The purpose of this task is to implement a {{RemoteLogMetadataManager}} based 
> on an internal topic in Kafka.
> Done means:
> - Pull Request available for review and unit-tests.
> System and integration tests are out of scope of this task and will be part 
> of another task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9550) RemoteLogManager implementation

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-9550:
--
Fix Version/s: 3.0.0

> RemoteLogManager implementation 
> 
>
> Key: KAFKA-9550
> URL: https://issues.apache.org/jira/browse/KAFKA-9550
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>
> Implementation of RLM as mentioned in the HLD section of KIP-405
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7739) Kafka Tiered Storage

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-7739:
--
Component/s: core

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Harsha
>Priority: Major
> Fix For: 3.0.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7739) Kafka Tiered Storage

2021-05-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-7739:
--
Fix Version/s: 3.0.0

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Harsha
>Assignee: Harsha
>Priority: Major
> Fix For: 3.0.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633909171



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -209,136 +196,6 @@ public void testUnregisterBroker() throws Throwable {
 }
 }
 
-static class MockSnapshotWriterBuilder implements Function {

Review comment:
   I don't want to remove the snapshot handling code or tests that we have 
for the controller. We should find a way to keep this test working before we 
merge this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633906477



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
##
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
 this.epoch = epoch;
 }
 

Review comment:
   Also, I just realized that all the data fields in here are public. Can 
we fix that? In Java we use public accessors, not public data fields, unless 
there's some really exceptional reason not to.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633905804



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -202,7 +200,7 @@ public Builder setMetrics(ControllerMetrics 
controllerMetrics) {
 
 @SuppressWarnings("unchecked")
 public QuorumController build() throws Exception {
-if (logManager == null) {
+if (raftClient == null) {
 throw new RuntimeException("You must set a metadata log 
manager.");

Review comment:
   Since we're now setting a raft client rather than metadata log manager, 
the exception message should be updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633904989



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -57,4 +60,9 @@
  */
 @Override
 void close();
+
+static  BatchReader singleton(Batch batch) {

Review comment:
   Can you add JavaDoc for this new method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633903456



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
##
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
 this.epoch = epoch;
 }
 

Review comment:
   Hmm... 
   
   * What do you think of using `nodeId` rather than `leaderId`?

   * There should be an accessor function to get the `nodeId` out.
   
   * If we really need a shortcut method for comparing against specific node 
IDs like this, we could call it `LeaderAndEpoch#hasNodeId(int nodeId)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633903456



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
##
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
 this.epoch = epoch;
 }
 

Review comment:
   Hmm... 
   
   * `LeaderAndEpoch#leaderId` seems redundant. Any ID inside the leader object 
is a leader id by definition.  `LeaderAndEpoch#nodeId` expresses what we mean 
more clearly.
   
   * There should be an accessor function to get the `nodeId` out.
   
   * If we really need a shortcut method for comparing against specific node 
IDs like this, we could call it `LeaderAndEpoch#hasNodeId(int nodeId)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633903456



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
##
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
 this.epoch = epoch;
 }
 

Review comment:
   Hmm... 
   
   * `Leader#leaderId` seems redundant. Any ID inside the leader object is a 
leader id by definition.  `Leader#nodeId` expresses what we mean more clearly.
   
   * There should be an accessor function to get the `nodeId` out.
   
   * If we really need a shortcut method for comparing against specific node 
IDs like this, we could call it `LeaderAndEpoch#hasNodeId(int nodeId)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633900447



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##
@@ -234,22 +259,20 @@ class BrokerMetadataListener(brokerId: Int,
 clientQuotaManager.handleQuotaRecord(record)
   }
 
-  class HandleNewLeaderEvent(leader: MetaLogLeader)
+  class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
   extends EventQueue.FailureLoggingEvent(log) {
 override def run(): Unit = {
   val imageBuilder =
 MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
-  if (leader.nodeId() < 0) {
-imageBuilder.controllerId(None)
-  } else {
-imageBuilder.controllerId(Some(leader.nodeId()))
-  }
+  imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala)
   metadataCache.image(imageBuilder.build())
 }
   }
 
-  override def handleNewLeader(leader: MetaLogLeader): Unit = {
-eventQueue.append(new HandleNewLeaderEvent(leader))
+  override def handleLeaderChange(leader: LeaderAndEpoch): Unit = {
+if (leader.isLeader(brokerId)) {
+  eventQueue.append(new HandleNewLeaderEvent(leader))

Review comment:
   This is not correct. We need to know the controller ID even if it's not 
the same as our broker ID.
   
   (It's not even clear that we will always have a broker with the same ID as 
any controller, since their ID space is separate.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest

2021-05-17 Thread GitBox


bbejeck commented on pull request #10697:
URL: https://github.com/apache/kafka/pull/10697#issuecomment-842656430


   merged #10697 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest

2021-05-17 Thread GitBox


bbejeck merged pull request #10697:
URL: https://github.com/apache/kafka/pull/10697


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

2021-05-17 Thread GitBox


ableegoldman commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633866937



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 throw new IllegalArgumentException(String.format("Trying to 
register store %s that is not a known global store", store.name()));
 }
 
+// register the store first, so that if later an exception is thrown 
then eventually while we call `close`

Review comment:
   +1 on disallowing the app to continue after an illegal exception. We 
need to reserve _some_ kind of exception for actual critical, fatal system 
errors that a user can't just ignore to spin up a new thread. And that has 
essentially been the meaning of these illegal exceptions in Streams thus far. 
As I mentioned in another thread, I've been very concerned about this in the 
new handler since we haven't been strict in properly cleaning up after an 
illegal exception




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on a change in pull request #10712: KAFKA-12798: Fixing MM2 rebalance timeout issue when source cluster is not available

2021-05-17 Thread GitBox


ryannedolan commented on a change in pull request #10712:
URL: https://github.com/apache/kafka/pull/10712#discussion_r63387



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -198,6 +198,10 @@
 + " properties to replicate.";
 public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
+private static final String 
SOURCE_CLUSTER_START_TASK_TIMEOUT_MILLISECOND_CONFIG = 
"source.cluster.start.task.timeout";
+private static final String 
SOURCE_CLUSTER_START_TASK_TIMEOUT_MILLISECOND_DOC = "Milliseconds to wait for 
tasks that affects source cluster at startup";

Review comment:
   Can we use the existing admin.timeout.ms for this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

2021-05-17 Thread GitBox


guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633858383



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 throw new IllegalArgumentException(String.format("Trying to 
register store %s that is not a known global store", store.name()));
 }
 
+// register the store first, so that if later an exception is thrown 
then eventually while we call `close`

Review comment:
   cc @ableegoldman @wcarlson5 @rodesai too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

2021-05-17 Thread GitBox


guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633858091



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 throw new IllegalArgumentException(String.format("Trying to 
register store %s that is not a known global store", store.name()));
 }
 
+// register the store first, so that if later an exception is thrown 
then eventually while we call `close`

Review comment:
   Okay, I think I got what we were discussing now. Originally I'm thinking 
that since these conditions should never happen --- because in the topology 
when we `add state stores` we already check if the store names have existed or 
not, and hence we should never add two stores with the same name --- if it ever 
happens we would always treat it as fatal and crash stop immediately.
   
   On the higher level, I think we should NOT allow users to handle illegal-s/a 
themselves and hence ever possibly to treat them not as fatal, but obviously 
today we do not enforce that.
   
   So I think we can have two options here: 1) in the lower level hierarchy 
like state manager here, try to stop the stores when hitting an illegal-s/a; 2) 
on the higher level hierarchy as in stream thread, we enforce "stop app" on 
illegal-s/a. I'm a bit leaning towards 2) here but would love to hear other 
opinions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

2021-05-17 Thread GitBox


ableegoldman commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633854799



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
 }
 
 final Set changelogTopics = new HashSet<>();
-for (final StateStore stateStore : globalStateStores) {
-globalStoreNames.add(stateStore.name());
+for (final StateStore stateStore : topology.globalStateStores()) {
 final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
 changelogTopics.add(sourceTopic);
 stateStore.init((StateStoreContext) globalProcessorContext, 
stateStore);

Review comment:
   Yeah, our attitude towards IllegalStateException has been pretty 
cavalier thus far, and it's one of the main things I'm concerned about with the 
REPLACE thread functionality. We should definitely be on the lookout for 
possible IllegalStateException occurrences in the codebase and try to triage 
them so things aren't just completely screwed up if Streams is allowed to 
continue after hitting one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

2021-05-17 Thread GitBox


guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633852084



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
 }
 
 final Set changelogTopics = new HashSet<>();
-for (final StateStore stateStore : globalStateStores) {
-globalStoreNames.add(stateStore.name());
+for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
   Yes, but the reason is that, in the unit test we do not really follow 
the trace of `stateMgr.initialize() -> store.init() -> context.registerStore() 
-> stateMgr.registerStore()`. That's because the `context` is a mock, which 
does not use the `stateMgr` at all, and hence the `stores` set is always empty.
   
   If we do want to test this call trace, then we need to make the mock context 
to get the actual stateMgr.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata

2021-05-17 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12781:
---
Affects Version/s: (was: 2.8.0)

> Improve the endOffsets accuracy in TaskMetadata 
> 
>
> Key: KAFKA-12781
> URL: https://issues.apache.org/jira/browse/KAFKA-12781
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Priority: Minor
> Fix For: 3.0.0
>
>
> Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the 
> main consumer in streams. It should be possible to get the highest offset in 
> the topic via the consumer instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata

2021-05-17 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346423#comment-17346423
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12781:


Ah, that's my bad – I was discussing this with Walker since I had also 
misunderstood the meaning of the current endOffsets() API, and recommended that 
he use the new API from John's KIP but then took it back as I thought we had 
reverted the new API in that task idling KIP. But I guess we only reverted some 
of the changes, of course we would still have required the currentLag().

In that case I agree with Guozhang, we can just knock this out right away with 
a small PR to leverage the currentLag() API. I'm putting the fix version as 3.0 
since it may not be critical to address this ASAP, but since it's an easy fix 
we should get to it by the time this is released. That will also help with the 
confusion around what endOffsets() means, since if both Guozhang and I 
misunderstood it then I'm guessing some users will too.

> Improve the endOffsets accuracy in TaskMetadata 
> 
>
> Key: KAFKA-12781
> URL: https://issues.apache.org/jira/browse/KAFKA-12781
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Minor
>
> Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the 
> main consumer in streams. It should be possible to get the highest offset in 
> the topic via the consumer instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12781) Improve the endOffsets accuracy in TaskMetadata

2021-05-17 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12781:
---
Fix Version/s: 3.0.0

> Improve the endOffsets accuracy in TaskMetadata 
> 
>
> Key: KAFKA-12781
> URL: https://issues.apache.org/jira/browse/KAFKA-12781
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Minor
> Fix For: 3.0.0
>
>
> Currently `TaskMetadata#endOffsets()` returns the highest offset seen by the 
> main consumer in streams. It should be possible to get the highest offset in 
> the topic via the consumer instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest

2021-05-17 Thread GitBox


bbejeck commented on pull request #10697:
URL: https://github.com/apache/kafka/pull/10697#issuecomment-842616336


   This PR only touched a python system test file, failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest

2021-05-17 Thread GitBox


rondagostino commented on pull request #10697:
URL: https://github.com/apache/kafka/pull/10697#issuecomment-842602694


   Thanks, @bbejeck .  Test passed: 
https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-05-17--001.1621281781--rondagostino--systest_add_cluster_annotation_streams_test--b20e203a0/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest

2021-05-17 Thread GitBox


bbejeck commented on pull request #10697:
URL: https://github.com/apache/kafka/pull/10697#issuecomment-842583178


   @rondagostino can we run a branch builder for this PR as a sanity check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-05-17 Thread GitBox


gharris1727 commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r633794872



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
##
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationTest {
+
+public static final Logger log = 
LoggerFactory.getLogger(SynchronizationTest.class);
+
+@Rule
+public final TestName testName = new TestName();
+
+private String threadPrefix;
+private Plugins plugins;
+private ThreadPoolExecutor exec;
+private Breakpoint dclBreakpoint;
+private Breakpoint pclBreakpoint;
+
+@Before
+public void setup() {
+TestPlugins.assertAvailable();
+Map pluginProps = Collections.singletonMap(
+WorkerConfig.PLUGIN_PATH_CONFIG,
+String.join(",", TestPlugins.pluginPath())
+);
+threadPrefix = SynchronizationTest.class.getSimpleName()
++ "." + testName.getMethodName() + "-";
+dclBreakpoint = new Breakpoint<>();
+pclBreakpoint = new Breakpoint<>();
+plugins = new Plugins(pluginProps) {
+@Override
+protected DelegatingClassLoader 
newDelegatingClassLoader(List paths) {
+return AccessController.doPrivileged(
+(PrivilegedAction) () ->
+new SynchronizedDelegatingClassLoader(paths)
+);
+}
+};
+exec = new ThreadPoolExecutor(
+2,
+2,
+1000L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingDeque<>(),
+threadFactoryWithNamedThreads(threadPrefix)
+);
+
+}
+
+@After
+public void tearDown() throws InterruptedException {
+dclBreakpoint.clear();
+pclBreakpoint.clear();
+exec.shutdown();
+exec.awaitTermination(1L, TimeUnit.SECONDS);
+}
+
+private static class Breakpoint {
+
+private Predicate predicate;
+private CyclicBarrier barrier;
+
+public synchronized void clear() {
+if (barrier != null) {
+barrier.reset();
+}
+predicate = null;
+barrier = null;
+}
+
+public synchronized void set(Predicate predicate) {
+clear();
+this.predicate = predicate;
+// As soon as the barrier is tripped, the barrier will be reset 
for the next round.
+barrier = new CyclicBarrier(2);
+}
+
+/**
+ * From a thread under test, await for the test orchestrator to 
continue execution
+ * @param obj Object to test 

[GitHub] [kafka] gharris1727 commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-05-17 Thread GitBox


gharris1727 commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r633780467



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -48,25 +48,25 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 public class DelegatingClassLoader extends URLClassLoader {
 private static final Logger log = 
LoggerFactory.getLogger(DelegatingClassLoader.class);
 private static final String CLASSPATH_NAME = "classpath";
 private static final String UNDEFINED_VERSION = "undefined";
 
-private final Map, ClassLoader>> 
pluginLoaders;
-private final Map aliases;
+private final ConcurrentMap, ClassLoader>> 
pluginLoaders;
+private final ConcurrentMap aliases;

Review comment:
   This was added by @kkonstantine but it does seem to be non-functional. 
I'll revert the type change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-05-17 Thread GitBox


rhauch commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r633778524



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
##
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationTest {
+
+public static final Logger log = 
LoggerFactory.getLogger(SynchronizationTest.class);
+
+@Rule
+public final TestName testName = new TestName();
+
+private String threadPrefix;
+private Plugins plugins;
+private ThreadPoolExecutor exec;
+private Breakpoint dclBreakpoint;
+private Breakpoint pclBreakpoint;
+
+@Before
+public void setup() {
+TestPlugins.assertAvailable();
+Map pluginProps = Collections.singletonMap(
+WorkerConfig.PLUGIN_PATH_CONFIG,
+String.join(",", TestPlugins.pluginPath())
+);
+threadPrefix = SynchronizationTest.class.getSimpleName()
++ "." + testName.getMethodName() + "-";
+dclBreakpoint = new Breakpoint<>();
+pclBreakpoint = new Breakpoint<>();
+plugins = new Plugins(pluginProps) {
+@Override
+protected DelegatingClassLoader 
newDelegatingClassLoader(List paths) {
+return AccessController.doPrivileged(
+(PrivilegedAction) () ->
+new SynchronizedDelegatingClassLoader(paths)
+);
+}
+};
+exec = new ThreadPoolExecutor(
+2,
+2,
+1000L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingDeque<>(),
+threadFactoryWithNamedThreads(threadPrefix)
+);
+
+}
+
+@After
+public void tearDown() throws InterruptedException {
+dclBreakpoint.clear();
+pclBreakpoint.clear();
+exec.shutdown();
+exec.awaitTermination(1L, TimeUnit.SECONDS);
+}
+
+private static class Breakpoint {
+
+private Predicate predicate;
+private CyclicBarrier barrier;
+
+public synchronized void clear() {
+if (barrier != null) {
+barrier.reset();
+}
+predicate = null;
+barrier = null;
+}
+
+public synchronized void set(Predicate predicate) {
+clear();
+this.predicate = predicate;
+// As soon as the barrier is tripped, the barrier will be reset 
for the next round.
+barrier = new CyclicBarrier(2);
+}
+
+/**
+ * From a thread under test, await for the test orchestrator to 
continue execution
+ * @param obj Object to test with 

[GitHub] [kafka] cmccabe commented on pull request #10696: KAFKA-12777 Refactor and cleanup AutoTopicCreationManager

2021-05-17 Thread GitBox


cmccabe commented on pull request #10696:
URL: https://github.com/apache/kafka/pull/10696#issuecomment-842547338


   (Copying offline discussion to github)
   
   I think creating a base class and using implementation inheritance is worse 
than the current code. Looking at the code, I see very little commonality 
between the ZK implementation and the API-based implementation. I think we 
should just have an interface and two separate implementations.
   
   The main code that we'd want to share between the two implementations is the 
routine that checks if topics have the wrong names. That's a very small amount 
of code which could simply be in a static method. If you want you could combine 
that with checking if a change is "in-flight" by passing in a map from topics 
to booleans, or similar. Or just have two separate maps in the separate 
implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r633769533



##
File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
##
@@ -139,10 +140,16 @@ public TopicIdPartition next() {
  * Partitions with no isr members appear in this map under id NO_LEADER.
  */
 private final TimelineHashMap> 
isrMembers;
+
+private final Map offlinePartitionCounts;

Review comment:
   All the information that is needed is already here.

##
File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
##
@@ -139,10 +140,16 @@ public TopicIdPartition next() {
  * Partitions with no isr members appear in this map under id NO_LEADER.
  */
 private final TimelineHashMap> 
isrMembers;
+
+private final Map offlinePartitionCounts;

Review comment:
   All the information that is needed is already here. If you delete X 
partitions that had a leader of -1, you decrement the counter by X.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12419) Remove Deprecated APIs of Kafka Streams in 3.0

2021-05-17 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346378#comment-17346378
 ] 

Josep Prat commented on KAFKA-12419:


Tomorrow I'll create the tickets for the Kafka Streams module, I'll do one per 
method / class and do my best to find out if the tickets already exist

> Remove Deprecated APIs of Kafka Streams in 3.0
> --
>
> Key: KAFKA-12419
> URL: https://issues.apache.org/jira/browse/KAFKA-12419
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, streams-test-utils
>Reporter: Guozhang Wang
>Assignee: Tomasz Nguyen
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Here's a list of deprecated APIs that we have accumulated in the past, we can 
> consider removing them in 3.0:
> * KIP-198: "--zookeeper" flag from StreamsResetter (1.0)
> * KIP-171: "–execute" flag from StreamsResetter (1.1)
> * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1)
> * KIP-251: overloaded "ProcessorContext#forward" (2.0)
> * KIP-276: "StreamsConfig#getConsumerConfig" (2.0)
> * KIP-319: "WindowBytesStoreSupplier#segments" (2.1)
> * KIP-321: "TopologyDescription.Source#topics" (2.1)
> * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1)
> * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1)
> * KIP-365/366: Implicit Scala Apis (2.1)
> * KIP-372: overloaded "KStream#groupBy" (2.1)
> * KIP-307: "Joined#named" (2.3)
> * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3)
> * KIP-429: "PartitionAssignor" interface (2.4)
> * KIP-470: "TopologyTestDriver#pipeInput" (2.4)
> * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4)
> * KIP-479: overloaded "KStream#join" (2.4)
> * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5)
> * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and 
> "KafkaStreams#store" (2.5)
> And here's a list of already filed JIRAs for removing deprecated APIs
> * KAFKA-10434
> * KAFKA-7785
> * KAFKA-12796



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #10633: KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests

2021-05-17 Thread GitBox


rajinisivaram merged pull request #10633:
URL: https://github.com/apache/kafka/pull/10633


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #10561: KAFKA-12686 AlterIsr and LeaderAndIsr race condition

2021-05-17 Thread GitBox


mumrah commented on pull request #10561:
URL: https://github.com/apache/kafka/pull/10561#issuecomment-842540233


   @chia7712 or @cmccabe let me know if you have anymore feedback or questions. 
I'd like to get this one merged 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #10633: KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests

2021-05-17 Thread GitBox


rajinisivaram commented on pull request #10633:
URL: https://github.com/apache/kafka/pull/10633#issuecomment-842540743


   @mumrah Thanks for the review. Test failures not related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-17 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r633749112



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -99,6 +100,9 @@ class ControllerApis(val requestChannel: RequestChannel,
 case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
 case ApiKeys.SASL_AUTHENTICATE => 
handleSaslAuthenticateRequest(request)
 case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
+case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)

Review comment:
   We might want the controller to process DescribeAcls for debug purposes. 
 There's no reason to artificially disable it from processing the RPC, although 
I agree that it will normally not be used.

##
File path: clients/src/main/resources/common/message/DeleteAclsRequest.json
##
@@ -16,7 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],

Review comment:
   (See above comment)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10714: MINOR: add ConfigUtils method for printing configurations

2021-05-17 Thread GitBox


cmccabe opened a new pull request #10714:
URL: https://github.com/apache/kafka/pull/10714


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-17 Thread GitBox


ableegoldman commented on pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#issuecomment-842500319


   One unrelated failure in RaftClusterTest, otherwise tests passed. Merging to 
trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-17 Thread GitBox


guozhangwang commented on a change in pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#discussion_r633174817



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -237,13 +237,15 @@ private static boolean isRecoverable(final KafkaException 
uncaughtException) {
  * @throws TaskMigratedException
  */
 protected void commitTransaction(final Map offsets,
-   final ConsumerGroupMetadata consumerGroupMetadata) {
+ final ConsumerGroupMetadata 
consumerGroupMetadata) {
 if (!eosEnabled()) {
 throw new IllegalStateException(formatException("Exactly-once is 
not enabled"));
 }
 maybeBeginTransaction();
 try {
-producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
+// Older brokers don't understand any group metadata beyond the 
group id, thus we must downgrade the request for eos-v1

Review comment:
   nit: maybe clarify a bit better here, e.g. "EXACTLY_ONCE_V2 on the 
Streams config assumes that broker is at least 2.5 and hence understand ... ; 
if it is smaller than V2, then the brokers may be on older versions and hence 
..."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade

2021-05-17 Thread GitBox


mumrah commented on pull request #10694:
URL: https://github.com/apache/kafka/pull/10694#issuecomment-842509752


   Did a `--repeat 4` run of `security_rolling_upgrade_test.py` and everything 
passed https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4514/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10707: KAFKA-12792: Fix metrics bug and introduce TimelineInteger

2021-05-17 Thread GitBox


cmccabe commented on pull request #10707:
URL: https://github.com/apache/kafka/pull/10707#issuecomment-842490006


   Thanks, @mumrah .
   
   > Should we include a comment about the thread safety of these timeline 
classes?
   
   Yeah.  I added a comment about the classes requiring external synchronization


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10709: KAFKA-12794: Fix trailing json tokens in DescribeProducersRequest.json

2021-05-17 Thread GitBox


dongjinleekr commented on pull request #10709:
URL: https://github.com/apache/kafka/pull/10709#issuecomment-842297201


   Nice Catch. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] NLincoln commented on pull request #10709: KAFKA-12794: Fix trailing json tokens in DescribeProducersRequest.json

2021-05-17 Thread GitBox


NLincoln commented on pull request #10709:
URL: https://github.com/apache/kafka/pull/10709#issuecomment-842404631


   @dajac done! https://issues.apache.org/jira/browse/KAFKA-12800
   
   I can't edit the assignee field of that ticket, but I can submit a patch for 
it tonight :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

2021-05-17 Thread GitBox


ableegoldman merged pull request #10542:
URL: https://github.com/apache/kafka/pull/10542


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10710: KAFKA-12796: Removal of deprecated classes under `streams-scala`

2021-05-17 Thread GitBox


jlprat commented on pull request #10710:
URL: https://github.com/apache/kafka/pull/10710#issuecomment-842288466






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10711: MINOR: Update Scala to 2.13.6

2021-05-17 Thread GitBox


jlprat commented on pull request #10711:
URL: https://github.com/apache/kafka/pull/10711#issuecomment-842513321


   PR was created 5 hours ago and some jenkins jobs are still running, maybe 
there has been a problem with those jobs.
   Anyway, the failure reported is one of the known flaky tests 
(RaftClusterTest)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #10709: KAFKA-12794: Fix trailing json tokens in DescribeProducersRequest.json

2021-05-17 Thread GitBox


dajac merged pull request #10709:
URL: https://github.com/apache/kafka/pull/10709


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade

2021-05-17 Thread GitBox


mumrah merged pull request #10694:
URL: https://github.com/apache/kafka/pull/10694


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-17 Thread GitBox


rondagostino commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r633107724



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -99,6 +100,9 @@ class ControllerApis(val requestChannel: RequestChannel,
 case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
 case ApiKeys.SASL_AUTHENTICATE => 
handleSaslAuthenticateRequest(request)
 case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
+case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)

Review comment:
   When would the controller ever process `DescribeAcls`?  It seems to me 
that it never would because brokers don't forward that request.  And if that is 
correct, I think my change to `DescribeAclsRequest.json` to set `"listeners": 
["zkBroker", "broker", "controller"],` should instead be setting it to 
`"listeners": ["zkBroker", "broker"],`.

##
File path: clients/src/main/resources/common/message/DeleteAclsRequest.json
##
@@ -16,7 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],

Review comment:
   As mentioned, I am wondering if this should be `listeners": ["zkBroker", 
"broker"],` since the KRaft broker never forward DescribeAcls to the controller.

##
File path: tests/kafkatest/tests/core/zookeeper_authorizer_test.py
##
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.security.kafka_acls import ACLs
+
+class ZooKeeperAuthorizerTest(Test):

Review comment:
   The only two system tests that leverage an authorizer are 
`ZooKeeperSecurityUpgradeTest` and `TestSecurityRollingUpgrade`.  The first is 
specific to ZooKeeper and does not apply to KRaft at all.  The second could be 
applied to KRaft because it tests changing the inter-broker security protocol 
across rolls rather than any broker version upgrade, but it has not yet been 
converted to the KRaft case, plus it does not remove ACLs (which we also want 
to test here).  Also, the authorizer piece at the end is not really the main 
part of that test.  It felt appropriate to create this simple, straightforward 
test for the specific use of the ZooKeeper-based authorizer.
   
   I've added a comment to this test (assuming we keep it) stating that we will 
test the KRaft replacement authorizer separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-17 Thread GitBox


ableegoldman commented on pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#issuecomment-842508231


   Just the one flaky test 
`org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable()`
 which Luke is going to look into. This should be ready to merge if there are 
no further suggestions @ijuma  @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade

2021-05-17 Thread GitBox


rondagostino commented on pull request #10694:
URL: https://github.com/apache/kafka/pull/10694#issuecomment-842403412


   Thanks, for the review @mumrah!  Regarding your question:
   
   > Looks like we're checking client security protocol + sasl mechanism as 
well as the inter broker protocol + sasl mechanism. 
Do we need to do the same for "intercontroller_security_protocol" in KRaft 
mode?
   
   The answer is yes -- it's a good point.  I've opened 
https://issues.apache.org/jira/browse/KAFKA-12799 to extend the existing tests 
to apply to KRaft controllers, and I indicated in that ticket that we will have 
to take security config mutations into account for that implementation as we 
did here.  I hope it's okay that we defer this to when we get to that ticket.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] NLincoln commented on a change in pull request #10709: KAFKA-12794: Fix trailing json tokens in DescribeProducersRequest.json

2021-05-17 Thread GitBox


NLincoln commented on a change in pull request #10709:
URL: https://github.com/apache/kafka/pull/10709#discussion_r633535873



##
File path: 
clients/src/main/resources/common/message/DescribeProducersRequest.json
##
@@ -27,6 +27,5 @@
 { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
   "about": "The indexes of the partitions to list producers for." }
]}
-]}

Review comment:
   1e9d680e4225de93169cb357147723d7527b3503  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-17 Thread GitBox


ableegoldman merged pull request #10675:
URL: https://github.com/apache/kafka/pull/10675


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10708: MINOR: remove unnecessary `public` keyword from `Partitioner` interface

2021-05-17 Thread GitBox


mjsax merged pull request #10708:
URL: https://github.com/apache/kafka/pull/10708


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   >